Skip to content

BaseTeam

Sub classes

Name Children Inherits
Team
llmling_agent.delegation.team
Group of agents that can execute together.
    TeamRun
    llmling_agent.delegation.teamrun
    Handles team operations with monitoring.

      Base classes

      Name Children Inherits
      MessageNode
      llmling_agent.messaging.messagenode
      Base class for all message processing nodes.
      Generic
      typing
      Abstract base class for generic types.

      ⋔ Inheritance diagram

      graph TD
        94111465490208["base_team.BaseTeam"]
        94111465455024["messagenode.MessageNode"]
        94111465799152["messageemitter.MessageEmitter"]
        94111462661808["tasks.TaskManagerMixin"]
        139887694254272["builtins.object"]
        94111414070624["abc.ABC"]
        94111413919344["typing.Generic"]
        94111465455024 --> 94111465490208
        94111465799152 --> 94111465455024
        94111462661808 --> 94111465799152
        139887694254272 --> 94111462661808
        94111414070624 --> 94111465799152
        139887694254272 --> 94111414070624
        94111413919344 --> 94111465799152
        139887694254272 --> 94111413919344
        94111413919344 --> 94111465455024
        94111413919344 --> 94111465490208

      🛈 DocStrings

      Bases: MessageNode[TDeps, TResult]

      Base class for Team and TeamRun.

      Source code in src/llmling_agent/delegation/base_team.py
       74
       75
       76
       77
       78
       79
       80
       81
       82
       83
       84
       85
       86
       87
       88
       89
       90
       91
       92
       93
       94
       95
       96
       97
       98
       99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      154
      155
      156
      157
      158
      159
      160
      161
      162
      163
      164
      165
      166
      167
      168
      169
      170
      171
      172
      173
      174
      175
      176
      177
      178
      179
      180
      181
      182
      183
      184
      185
      186
      187
      188
      189
      190
      191
      192
      193
      194
      195
      196
      197
      198
      199
      200
      201
      202
      203
      204
      205
      206
      207
      208
      209
      210
      211
      212
      213
      214
      215
      216
      217
      218
      219
      220
      221
      222
      223
      224
      225
      226
      227
      228
      229
      230
      231
      232
      233
      234
      235
      236
      237
      238
      239
      240
      241
      242
      243
      244
      245
      246
      247
      248
      249
      250
      251
      252
      253
      254
      255
      256
      257
      258
      259
      260
      261
      262
      263
      264
      265
      266
      267
      268
      269
      270
      271
      272
      273
      274
      275
      276
      277
      278
      279
      280
      281
      282
      283
      284
      285
      286
      287
      288
      289
      290
      291
      292
      293
      294
      295
      296
      297
      298
      299
      300
      301
      302
      303
      304
      305
      306
      307
      308
      309
      310
      311
      312
      313
      314
      315
      316
      317
      318
      319
      320
      321
      322
      323
      324
      325
      326
      327
      328
      329
      330
      331
      332
      333
      334
      335
      336
      337
      338
      339
      340
      341
      342
      343
      344
      345
      346
      347
      348
      349
      350
      351
      352
      353
      354
      355
      356
      357
      358
      359
      360
      361
      362
      363
      364
      365
      366
      367
      368
      369
      370
      371
      372
      373
      374
      375
      376
      377
      378
      379
      380
      381
      382
      383
      384
      385
      386
      387
      388
      389
      390
      391
      392
      393
      394
      395
      396
      397
      398
      399
      400
      401
      402
      403
      404
      405
      406
      407
      408
      409
      410
      411
      412
      413
      414
      415
      416
      417
      418
      419
      420
      421
      422
      423
      424
      425
      426
      427
      428
      429
      430
      431
      432
      433
      434
      435
      436
      437
      438
      439
      440
      441
      442
      443
      444
      445
      446
      447
      448
      449
      450
      451
      452
      453
      454
      455
      456
      457
      458
      459
      460
      461
      462
      463
      464
      465
      466
      467
      468
      469
      470
      471
      472
      473
      474
      475
      476
      477
      478
      479
      480
      481
      482
      483
      484
      485
      486
      487
      488
      489
      490
      491
      492
      493
      494
      495
      496
      497
      498
      499
      500
      501
      502
      503
      504
      505
      506
      507
      508
      509
      510
      511
      512
      513
      514
      515
      516
      517
      518
      519
      520
      521
      522
      523
      524
      525
      526
      527
      528
      529
      530
      531
      532
      533
      534
      535
      536
      537
      538
      539
      540
      541
      542
      543
      544
      545
      class BaseTeam[TDeps, TResult](MessageNode[TDeps, TResult]):
          """Base class for Team and TeamRun."""
      
          def __init__(
              self,
              agents: Sequence[MessageNode[TDeps, TResult]],
              *,
              name: str | None = None,
              description: str | None = None,
              shared_prompt: str | None = None,
              mcp_servers: list[str | MCPServerConfig] | None = None,
              picker: AnyAgent[Any, Any] | None = None,
              num_picks: int | None = None,
              pick_prompt: str | None = None,
          ):
              """Common variables only for typing."""
              from llmling_agent.delegation.teamrun import ExtendedTeamTalk
      
              self._name = name or " & ".join([i.name for i in agents])
              self.agents = EventedList[MessageNode]()
              self.agents.events.inserted.connect(self._on_node_added)
              self.agents.events.removed.connect(self._on_node_removed)
              self.agents.events.changed.connect(self._on_node_changed)
              super().__init__(
                  name=self._name,
                  context=self.context,
                  mcp_servers=mcp_servers,
                  description=description,
              )
              self.agents.extend(list(agents))
              self._team_talk = ExtendedTeamTalk()
              self.shared_prompt = shared_prompt
              self._main_task: asyncio.Task[Any] | None = None
              self._infinite = False
              self.picker = picker
              self.num_picks = num_picks
              self.pick_prompt = pick_prompt
      
          def to_tool(self, *, name: str | None = None, description: str | None = None) -> Tool:
              """Create a tool from this agent.
      
              Args:
                  name: Optional tool name override
                  description: Optional tool description override
              """
              tool_name = name or f"ask_{self.name}"
      
              async def wrapped_tool(prompt: str) -> TResult:
                  result = await self.run(prompt)
                  return result.data
      
              docstring = description or f"Get expert answer from node {self.name}"
              if self.description:
                  docstring = f"{docstring}\n\n{self.description}"
      
              wrapped_tool.__doc__ = docstring
              wrapped_tool.__name__ = tool_name
      
              return Tool.from_callable(
                  wrapped_tool,
                  name_override=tool_name,
                  description_override=docstring,
              )
      
          async def pick_agents(self, task: str) -> Sequence[MessageNode[Any, Any]]:
              """Pick agents to run."""
              if self.picker:
                  if self.num_picks == 1:
                      result = await self.picker.talk.pick(self, task, self.pick_prompt)
                      return [result.selection]
                  result = await self.picker.talk.pick_multiple(
                      self,
                      task,
                      min_picks=self.num_picks or 1,
                      max_picks=self.num_picks,
                      prompt=self.pick_prompt,
                  )
                  return result.selections
              return list(self.agents)
      
          def _on_node_changed(self, index: int, old: MessageNode, new: MessageNode):
              """Handle node replacement in the agents list."""
              self._on_node_removed(index, old)
              self._on_node_added(index, new)
      
          def _on_node_added(self, index: int, node: MessageNode[Any, Any]):
              """Handler for adding nodes to the team."""
              from llmling_agent.agent import Agent, StructuredAgent
      
              if isinstance(node, Agent | StructuredAgent):
                  node.tools.add_provider(self.mcp)
              # TODO: Right now connecting here is not desired since emission means db logging
              # ideally db logging would not rely on the "public" agent signal.
      
              # node.tool_used.connect(self.tool_used)
      
          def _on_node_removed(self, index: int, node: MessageNode[Any, Any]):
              """Handler for removing nodes from the team."""
              from llmling_agent.agent import Agent, StructuredAgent
      
              if isinstance(node, Agent | StructuredAgent):
                  node.tools.remove_provider(self.mcp)
              # node.tool_used.disconnect(self.tool_used)
      
          def __repr__(self) -> str:
              """Create readable representation."""
              members = ", ".join(agent.name for agent in self.agents)
              name = f" ({self.name})" if self.name else ""
              return f"{self.__class__.__name__}[{len(self.agents)}]{name}: {members}"
      
          def __len__(self) -> int:
              """Get number of team members."""
              return len(self.agents)
      
          def __iter__(self) -> Iterator[MessageNode[TDeps, TResult]]:
              """Iterate over team members."""
              return iter(self.agents)
      
          def __getitem__(self, index_or_name: int | str) -> MessageNode[TDeps, TResult]:
              """Get team member by index or name."""
              if isinstance(index_or_name, str):
                  return next(agent for agent in self.agents if agent.name == index_or_name)
              return self.agents[index_or_name]
      
          def __or__(
              self,
              other: AnyAgent[Any, Any] | ProcessorCallback[Any] | BaseTeam[Any, Any],
          ) -> TeamRun[Any, Any]:
              """Create a sequential pipeline."""
              from llmling_agent.agent import Agent, StructuredAgent
              from llmling_agent.delegation.teamrun import TeamRun
      
              # Handle conversion of callables first
              if callable(other):
                  if has_return_type(other, str):
                      other = Agent.from_callback(other)
                  else:
                      other = StructuredAgent.from_callback(other)
                  other.context.pool = self.context.pool
      
              # If we're already a TeamRun, extend it
              if isinstance(self, TeamRun):
                  if self.validator:
                      # If we have a validator, create new TeamRun to preserve validation
                      return TeamRun([self, other])
                  self.agents.append(other)
                  return self
              # Otherwise create new TeamRun
              return TeamRun([self, other])
      
          @overload
          def __and__(self, other: Team[None]) -> Team[None]: ...
      
          @overload
          def __and__(self, other: Team[TDeps]) -> Team[TDeps]: ...
      
          @overload
          def __and__(self, other: Team[Any]) -> Team[Any]: ...
      
          @overload
          def __and__(self, other: AnyAgent[TDeps, Any]) -> Team[TDeps]: ...
      
          @overload
          def __and__(self, other: AnyAgent[Any, Any]) -> Team[Any]: ...
      
          def __and__(
              self, other: Team[Any] | AnyAgent[Any, Any] | ProcessorCallback[Any]
          ) -> Team[Any]:
              """Combine teams, preserving type safety for same types."""
              from llmling_agent.agent import Agent, StructuredAgent
              from llmling_agent.delegation.team import Team
      
              if callable(other):
                  if has_return_type(other, str):
                      other = Agent.from_callback(other)
                  else:
                      other = StructuredAgent.from_callback(other)
                  other.context.pool = self.context.pool
      
              match other:
                  case Team():
                      # Flatten when combining Teams
                      return Team([*self.agents, *other.agents])
                  case _:
                      # Everything else just becomes a member
                      return Team([*self.agents, other])
      
          @property
          def stats(self) -> AggregatedMessageStats:
              """Get aggregated stats from all team members."""
              return AggregatedMessageStats(stats=[agent.stats for agent in self.agents])
      
          @property
          def is_running(self) -> bool:
              """Whether execution is currently running."""
              return bool(self._main_task and not self._main_task.done())
      
          def is_busy(self) -> bool:
              """Check if team is processing any tasks."""
              return bool(self._pending_tasks or self._main_task)
      
          async def stop(self):
              """Stop background execution if running."""
              if self._main_task and not self._main_task.done():
                  self._main_task.cancel()
                  await self._main_task
              self._main_task = None
              await self.cleanup_tasks()
      
          async def wait(self) -> ChatMessage[Any] | None:
              """Wait for background execution to complete and return last message."""
              if not self._main_task:
                  msg = "No execution running"
                  raise RuntimeError(msg)
              if self._infinite:
                  msg = "Cannot wait on infinite execution"
                  raise RuntimeError(msg)
              try:
                  return await self._main_task
              finally:
                  await self.cleanup_tasks()
                  self._main_task = None
      
          async def run_in_background(
              self,
              *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
              max_count: int | None = 1,  # 1 = single execution, None = indefinite
              interval: float = 1.0,
              **kwargs: Any,
          ) -> ExtendedTeamTalk:
              """Start execution in background.
      
              Args:
                  prompts: Prompts to execute
                  max_count: Maximum number of executions (None = run indefinitely)
                  interval: Seconds between executions
                  **kwargs: Additional args for execute()
              """
              if self._main_task:
                  msg = "Execution already running"
                  raise RuntimeError(msg)
              self._infinite = max_count is None
      
              async def _continuous() -> ChatMessage[Any] | None:
                  count = 0
                  last_message = None
                  while max_count is None or count < max_count:
                      try:
                          result = await self.execute(*prompts, **kwargs)
                          last_message = result[-1].message if result else None
                          count += 1
                          if max_count is None or count < max_count:
                              await asyncio.sleep(interval)
                      except asyncio.CancelledError:
                          logger.debug("Background execution cancelled")
                          break
                  return last_message
      
              self._main_task = self.create_task(_continuous(), name="main_execution")
              return self._team_talk
      
          @property
          def execution_stats(self) -> AggregatedTalkStats:
              """Get current execution statistics."""
              return self._team_talk.stats
      
          @property
          def talk(self) -> ExtendedTeamTalk:
              """Get current connection."""
              return self._team_talk
      
          @property
          def events(self) -> ListEvents:
              """Get events for the team."""
              return self.agents.events
      
          async def cancel(self):
              """Cancel execution and cleanup."""
              if self._main_task:
                  self._main_task.cancel()
              await self.cleanup_tasks()
      
          def get_structure_diagram(self) -> str:
              """Generate mermaid flowchart of node hierarchy."""
              lines = ["flowchart TD"]
      
              def add_node(node: MessageNode[Any, Any], parent: str | None = None):
                  """Recursively add node and its members to diagram."""
                  node_id = f"node_{id(node)}"
                  lines.append(f"    {node_id}[{node.name}]")
                  if parent:
                      lines.append(f"    {parent} --> {node_id}")
      
                  # If it's a team, recursively add its members
                  from llmling_agent.delegation.base_team import BaseTeam
      
                  if isinstance(node, BaseTeam):
                      for member in node.agents:
                          add_node(member, node_id)
      
              # Start with root nodes (team members)
              for node in self.agents:
                  add_node(node)
      
              return "\n".join(lines)
      
          def iter_agents(self) -> Iterator[AnyAgent[Any, Any]]:
              """Recursively iterate over all child agents."""
              from llmling_agent.agent import Agent, StructuredAgent
      
              for node in self.agents:
                  match node:
                      case BaseTeam():
                          yield from node.iter_agents()
                      case Agent() | StructuredAgent():
                          yield node
                      case _:
                          msg = f"Invalid node type: {type(node)}"
                          raise ValueError(msg)
      
          @property
          def context(self) -> TeamContext:
              """Get shared pool from team members.
      
              Raises:
                  ValueError: If team members belong to different pools
              """
              from llmling_agent.delegation.team import Team
      
              pool_ids: set[int] = set()
              shared_pool: AgentPool | None = None
              team_config: TeamConfig | None = None
      
              for agent in self.iter_agents():
                  if agent.context and agent.context.pool:
                      pool_id = id(agent.context.pool)
                      if pool_id not in pool_ids:
                          pool_ids.add(pool_id)
                          shared_pool = agent.context.pool
                          if shared_pool.manifest.teams:
                              team_config = shared_pool.manifest.teams.get(self.name)
              if not team_config:
                  mode = "parallel" if isinstance(self, Team) else "sequential"
                  team_config = TeamConfig(name=self.name, mode=mode, members=[])
              if not pool_ids:
                  logger.info("No pool found for team %s.", self.name)
                  return TeamContext(
                      node_name=self.name,
                      pool=shared_pool,
                      config=team_config,
                      definition=shared_pool.manifest if shared_pool else AgentsManifest(),
                  )
      
              if len(pool_ids) > 1:
                  msg = f"Team members in {self.name} belong to different pools"
                  raise ValueError(msg)
              return TeamContext(
                  node_name=self.name,
                  pool=shared_pool,
                  config=team_config,
                  definition=shared_pool.manifest if shared_pool else AgentsManifest(),
              )
      
          @context.setter
          def context(self, value: NodeContext):
              msg = "Cannot set context on BaseTeam"
              raise RuntimeError(msg)
      
          async def distribute(
              self,
              content: str,
              *,
              tools: list[str] | None = None,
              resources: list[str] | None = None,
              metadata: dict[str, Any] | None = None,
          ):
              """Distribute content and capabilities to all team members."""
              for agent in self.iter_agents():
                  # Add context message
                  agent.conversation.add_context_message(
                      content, source="distribution", metadata=metadata
                  )
      
                  # Register tools if provided
                  if tools:
                      for tool in tools:
                          agent.tools.register_tool(tool)
      
                  # Load resources if provided
                  if resources:
                      for resource in resources:
                          await agent.conversation.load_context_source(resource)
      
          @asynccontextmanager
          async def temporary_state(
              self,
              *,
              system_prompts: list[AnyPromptType] | None = None,
              replace_prompts: bool = False,
              tools: list[ToolType] | None = None,
              replace_tools: bool = False,
              history: list[AnyPromptType] | SessionQuery | None = None,
              replace_history: bool = False,
              pause_routing: bool = False,
              model: ModelType | None = None,
              provider: AgentProvider | None = None,
          ) -> AsyncIterator[Self]:
              """Temporarily modify state of all agents in the team.
      
              All agents in the team will enter their temporary state simultaneously.
      
              Args:
                  system_prompts: Temporary system prompts to use
                  replace_prompts: Whether to replace existing prompts
                  tools: Temporary tools to make available
                  replace_tools: Whether to replace existing tools
                  history: Conversation history (prompts or query)
                  replace_history: Whether to replace existing history
                  pause_routing: Whether to pause message routing
                  model: Temporary model override
                  provider: Temporary provider override
              """
              # Get all agents (flattened) before entering context
              agents = list(self.iter_agents())
      
              async with AsyncExitStack() as stack:
                  if pause_routing:
                      await stack.enter_async_context(self.connections.paused_routing())
                  # Enter temporary state for all agents
                  for agent in agents:
                      await stack.enter_async_context(
                          agent.temporary_state(
                              system_prompts=system_prompts,
                              replace_prompts=replace_prompts,
                              tools=tools,
                              replace_tools=replace_tools,
                              history=history,
                              replace_history=replace_history,
                              pause_routing=pause_routing,
                              model=model,
                              provider=provider,
                          )
                      )
                  try:
                      yield self
                  finally:
                      # AsyncExitStack will handle cleanup of all states
                      pass
      
          @abstractmethod
          async def execute(
              self,
              *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
              **kwargs: Any,
          ) -> TeamResponse: ...
      
          def run_sync(
              self,
              *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str],
              store_history: bool = True,
          ) -> ChatMessage[TResult]:
              """Run agent synchronously (convenience wrapper).
      
              Args:
                  prompt: User query or instruction
                  store_history: Whether the message exchange should be added to the
                                 context window
              Returns:
                  Result containing response and run information
              """
              coro = self.run(*prompt, store_history=store_history)
              return self.run_task_sync(coro)
      

      context property writable

      context: TeamContext
      

      Get shared pool from team members.

      Raises:

      Type Description
      ValueError

      If team members belong to different pools

      events property

      events: ListEvents
      

      Get events for the team.

      execution_stats property

      execution_stats: AggregatedTalkStats
      

      Get current execution statistics.

      is_running property

      is_running: bool
      

      Whether execution is currently running.

      stats property

      Get aggregated stats from all team members.

      talk property

      Get current connection.

      __and__

      __and__(other: Team[None]) -> Team[None]
      
      __and__(other: Team[TDeps]) -> Team[TDeps]
      
      __and__(other: Team[Any]) -> Team[Any]
      
      __and__(other: AnyAgent[TDeps, Any]) -> Team[TDeps]
      
      __and__(other: AnyAgent[Any, Any]) -> Team[Any]
      
      __and__(other: Team[Any] | AnyAgent[Any, Any] | ProcessorCallback[Any]) -> Team[Any]
      

      Combine teams, preserving type safety for same types.

      Source code in src/llmling_agent/delegation/base_team.py
      239
      240
      241
      242
      243
      244
      245
      246
      247
      248
      249
      250
      251
      252
      253
      254
      255
      256
      257
      258
      259
      def __and__(
          self, other: Team[Any] | AnyAgent[Any, Any] | ProcessorCallback[Any]
      ) -> Team[Any]:
          """Combine teams, preserving type safety for same types."""
          from llmling_agent.agent import Agent, StructuredAgent
          from llmling_agent.delegation.team import Team
      
          if callable(other):
              if has_return_type(other, str):
                  other = Agent.from_callback(other)
              else:
                  other = StructuredAgent.from_callback(other)
              other.context.pool = self.context.pool
      
          match other:
              case Team():
                  # Flatten when combining Teams
                  return Team([*self.agents, *other.agents])
              case _:
                  # Everything else just becomes a member
                  return Team([*self.agents, other])
      

      __getitem__

      __getitem__(index_or_name: int | str) -> MessageNode[TDeps, TResult]
      

      Get team member by index or name.

      Source code in src/llmling_agent/delegation/base_team.py
      192
      193
      194
      195
      196
      def __getitem__(self, index_or_name: int | str) -> MessageNode[TDeps, TResult]:
          """Get team member by index or name."""
          if isinstance(index_or_name, str):
              return next(agent for agent in self.agents if agent.name == index_or_name)
          return self.agents[index_or_name]
      

      __init__

      __init__(
          agents: Sequence[MessageNode[TDeps, TResult]],
          *,
          name: str | None = None,
          description: str | None = None,
          shared_prompt: str | None = None,
          mcp_servers: list[str | MCPServerConfig] | None = None,
          picker: AnyAgent[Any, Any] | None = None,
          num_picks: int | None = None,
          pick_prompt: str | None = None,
      )
      

      Common variables only for typing.

      Source code in src/llmling_agent/delegation/base_team.py
       77
       78
       79
       80
       81
       82
       83
       84
       85
       86
       87
       88
       89
       90
       91
       92
       93
       94
       95
       96
       97
       98
       99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      def __init__(
          self,
          agents: Sequence[MessageNode[TDeps, TResult]],
          *,
          name: str | None = None,
          description: str | None = None,
          shared_prompt: str | None = None,
          mcp_servers: list[str | MCPServerConfig] | None = None,
          picker: AnyAgent[Any, Any] | None = None,
          num_picks: int | None = None,
          pick_prompt: str | None = None,
      ):
          """Common variables only for typing."""
          from llmling_agent.delegation.teamrun import ExtendedTeamTalk
      
          self._name = name or " & ".join([i.name for i in agents])
          self.agents = EventedList[MessageNode]()
          self.agents.events.inserted.connect(self._on_node_added)
          self.agents.events.removed.connect(self._on_node_removed)
          self.agents.events.changed.connect(self._on_node_changed)
          super().__init__(
              name=self._name,
              context=self.context,
              mcp_servers=mcp_servers,
              description=description,
          )
          self.agents.extend(list(agents))
          self._team_talk = ExtendedTeamTalk()
          self.shared_prompt = shared_prompt
          self._main_task: asyncio.Task[Any] | None = None
          self._infinite = False
          self.picker = picker
          self.num_picks = num_picks
          self.pick_prompt = pick_prompt
      

      __iter__

      __iter__() -> Iterator[MessageNode[TDeps, TResult]]
      

      Iterate over team members.

      Source code in src/llmling_agent/delegation/base_team.py
      188
      189
      190
      def __iter__(self) -> Iterator[MessageNode[TDeps, TResult]]:
          """Iterate over team members."""
          return iter(self.agents)
      

      __len__

      __len__() -> int
      

      Get number of team members.

      Source code in src/llmling_agent/delegation/base_team.py
      184
      185
      186
      def __len__(self) -> int:
          """Get number of team members."""
          return len(self.agents)
      

      __or__

      __or__(
          other: AnyAgent[Any, Any] | ProcessorCallback[Any] | BaseTeam[Any, Any],
      ) -> TeamRun[Any, Any]
      

      Create a sequential pipeline.

      Source code in src/llmling_agent/delegation/base_team.py
      198
      199
      200
      201
      202
      203
      204
      205
      206
      207
      208
      209
      210
      211
      212
      213
      214
      215
      216
      217
      218
      219
      220
      221
      222
      def __or__(
          self,
          other: AnyAgent[Any, Any] | ProcessorCallback[Any] | BaseTeam[Any, Any],
      ) -> TeamRun[Any, Any]:
          """Create a sequential pipeline."""
          from llmling_agent.agent import Agent, StructuredAgent
          from llmling_agent.delegation.teamrun import TeamRun
      
          # Handle conversion of callables first
          if callable(other):
              if has_return_type(other, str):
                  other = Agent.from_callback(other)
              else:
                  other = StructuredAgent.from_callback(other)
              other.context.pool = self.context.pool
      
          # If we're already a TeamRun, extend it
          if isinstance(self, TeamRun):
              if self.validator:
                  # If we have a validator, create new TeamRun to preserve validation
                  return TeamRun([self, other])
              self.agents.append(other)
              return self
          # Otherwise create new TeamRun
          return TeamRun([self, other])
      

      __repr__

      __repr__() -> str
      

      Create readable representation.

      Source code in src/llmling_agent/delegation/base_team.py
      178
      179
      180
      181
      182
      def __repr__(self) -> str:
          """Create readable representation."""
          members = ", ".join(agent.name for agent in self.agents)
          name = f" ({self.name})" if self.name else ""
          return f"{self.__class__.__name__}[{len(self.agents)}]{name}: {members}"
      

      _on_node_added

      _on_node_added(index: int, node: MessageNode[Any, Any])
      

      Handler for adding nodes to the team.

      Source code in src/llmling_agent/delegation/base_team.py
      159
      160
      161
      162
      163
      164
      def _on_node_added(self, index: int, node: MessageNode[Any, Any]):
          """Handler for adding nodes to the team."""
          from llmling_agent.agent import Agent, StructuredAgent
      
          if isinstance(node, Agent | StructuredAgent):
              node.tools.add_provider(self.mcp)
      

      _on_node_changed

      _on_node_changed(index: int, old: MessageNode, new: MessageNode)
      

      Handle node replacement in the agents list.

      Source code in src/llmling_agent/delegation/base_team.py
      154
      155
      156
      157
      def _on_node_changed(self, index: int, old: MessageNode, new: MessageNode):
          """Handle node replacement in the agents list."""
          self._on_node_removed(index, old)
          self._on_node_added(index, new)
      

      _on_node_removed

      _on_node_removed(index: int, node: MessageNode[Any, Any])
      

      Handler for removing nodes from the team.

      Source code in src/llmling_agent/delegation/base_team.py
      170
      171
      172
      173
      174
      175
      def _on_node_removed(self, index: int, node: MessageNode[Any, Any]):
          """Handler for removing nodes from the team."""
          from llmling_agent.agent import Agent, StructuredAgent
      
          if isinstance(node, Agent | StructuredAgent):
              node.tools.remove_provider(self.mcp)
      

      cancel async

      cancel()
      

      Cancel execution and cleanup.

      Source code in src/llmling_agent/delegation/base_team.py
      350
      351
      352
      353
      354
      async def cancel(self):
          """Cancel execution and cleanup."""
          if self._main_task:
              self._main_task.cancel()
          await self.cleanup_tasks()
      

      distribute async

      distribute(
          content: str,
          *,
          tools: list[str] | None = None,
          resources: list[str] | None = None,
          metadata: dict[str, Any] | None = None,
      )
      

      Distribute content and capabilities to all team members.

      Source code in src/llmling_agent/delegation/base_team.py
      442
      443
      444
      445
      446
      447
      448
      449
      450
      451
      452
      453
      454
      455
      456
      457
      458
      459
      460
      461
      462
      463
      464
      465
      async def distribute(
          self,
          content: str,
          *,
          tools: list[str] | None = None,
          resources: list[str] | None = None,
          metadata: dict[str, Any] | None = None,
      ):
          """Distribute content and capabilities to all team members."""
          for agent in self.iter_agents():
              # Add context message
              agent.conversation.add_context_message(
                  content, source="distribution", metadata=metadata
              )
      
              # Register tools if provided
              if tools:
                  for tool in tools:
                      agent.tools.register_tool(tool)
      
              # Load resources if provided
              if resources:
                  for resource in resources:
                      await agent.conversation.load_context_source(resource)
      

      get_structure_diagram

      get_structure_diagram() -> str
      

      Generate mermaid flowchart of node hierarchy.

      Source code in src/llmling_agent/delegation/base_team.py
      356
      357
      358
      359
      360
      361
      362
      363
      364
      365
      366
      367
      368
      369
      370
      371
      372
      373
      374
      375
      376
      377
      378
      def get_structure_diagram(self) -> str:
          """Generate mermaid flowchart of node hierarchy."""
          lines = ["flowchart TD"]
      
          def add_node(node: MessageNode[Any, Any], parent: str | None = None):
              """Recursively add node and its members to diagram."""
              node_id = f"node_{id(node)}"
              lines.append(f"    {node_id}[{node.name}]")
              if parent:
                  lines.append(f"    {parent} --> {node_id}")
      
              # If it's a team, recursively add its members
              from llmling_agent.delegation.base_team import BaseTeam
      
              if isinstance(node, BaseTeam):
                  for member in node.agents:
                      add_node(member, node_id)
      
          # Start with root nodes (team members)
          for node in self.agents:
              add_node(node)
      
          return "\n".join(lines)
      

      is_busy

      is_busy() -> bool
      

      Check if team is processing any tasks.

      Source code in src/llmling_agent/delegation/base_team.py
      271
      272
      273
      def is_busy(self) -> bool:
          """Check if team is processing any tasks."""
          return bool(self._pending_tasks or self._main_task)
      

      iter_agents

      iter_agents() -> Iterator[AnyAgent[Any, Any]]
      

      Recursively iterate over all child agents.

      Source code in src/llmling_agent/delegation/base_team.py
      380
      381
      382
      383
      384
      385
      386
      387
      388
      389
      390
      391
      392
      def iter_agents(self) -> Iterator[AnyAgent[Any, Any]]:
          """Recursively iterate over all child agents."""
          from llmling_agent.agent import Agent, StructuredAgent
      
          for node in self.agents:
              match node:
                  case BaseTeam():
                      yield from node.iter_agents()
                  case Agent() | StructuredAgent():
                      yield node
                  case _:
                      msg = f"Invalid node type: {type(node)}"
                      raise ValueError(msg)
      

      pick_agents async

      pick_agents(task: str) -> Sequence[MessageNode[Any, Any]]
      

      Pick agents to run.

      Source code in src/llmling_agent/delegation/base_team.py
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      async def pick_agents(self, task: str) -> Sequence[MessageNode[Any, Any]]:
          """Pick agents to run."""
          if self.picker:
              if self.num_picks == 1:
                  result = await self.picker.talk.pick(self, task, self.pick_prompt)
                  return [result.selection]
              result = await self.picker.talk.pick_multiple(
                  self,
                  task,
                  min_picks=self.num_picks or 1,
                  max_picks=self.num_picks,
                  prompt=self.pick_prompt,
              )
              return result.selections
          return list(self.agents)
      

      run_in_background async

      run_in_background(
          *prompts: AnyPromptType | Image | PathLike[str] | None,
          max_count: int | None = 1,
          interval: float = 1.0,
          **kwargs: Any,
      ) -> ExtendedTeamTalk
      

      Start execution in background.

      Parameters:

      Name Type Description Default
      prompts AnyPromptType | Image | PathLike[str] | None

      Prompts to execute

      ()
      max_count int | None

      Maximum number of executions (None = run indefinitely)

      1
      interval float

      Seconds between executions

      1.0
      **kwargs Any

      Additional args for execute()

      {}
      Source code in src/llmling_agent/delegation/base_team.py
      297
      298
      299
      300
      301
      302
      303
      304
      305
      306
      307
      308
      309
      310
      311
      312
      313
      314
      315
      316
      317
      318
      319
      320
      321
      322
      323
      324
      325
      326
      327
      328
      329
      330
      331
      332
      333
      async def run_in_background(
          self,
          *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
          max_count: int | None = 1,  # 1 = single execution, None = indefinite
          interval: float = 1.0,
          **kwargs: Any,
      ) -> ExtendedTeamTalk:
          """Start execution in background.
      
          Args:
              prompts: Prompts to execute
              max_count: Maximum number of executions (None = run indefinitely)
              interval: Seconds between executions
              **kwargs: Additional args for execute()
          """
          if self._main_task:
              msg = "Execution already running"
              raise RuntimeError(msg)
          self._infinite = max_count is None
      
          async def _continuous() -> ChatMessage[Any] | None:
              count = 0
              last_message = None
              while max_count is None or count < max_count:
                  try:
                      result = await self.execute(*prompts, **kwargs)
                      last_message = result[-1].message if result else None
                      count += 1
                      if max_count is None or count < max_count:
                          await asyncio.sleep(interval)
                  except asyncio.CancelledError:
                      logger.debug("Background execution cancelled")
                      break
              return last_message
      
          self._main_task = self.create_task(_continuous(), name="main_execution")
          return self._team_talk
      

      run_sync

      run_sync(
          *prompt: AnyPromptType | Image | PathLike[str], store_history: bool = True
      ) -> ChatMessage[TResult]
      

      Run agent synchronously (convenience wrapper).

      Parameters:

      Name Type Description Default
      prompt AnyPromptType | Image | PathLike[str]

      User query or instruction

      ()
      store_history bool

      Whether the message exchange should be added to the context window

      True

      Returns: Result containing response and run information

      Source code in src/llmling_agent/delegation/base_team.py
      530
      531
      532
      533
      534
      535
      536
      537
      538
      539
      540
      541
      542
      543
      544
      545
      def run_sync(
          self,
          *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str],
          store_history: bool = True,
      ) -> ChatMessage[TResult]:
          """Run agent synchronously (convenience wrapper).
      
          Args:
              prompt: User query or instruction
              store_history: Whether the message exchange should be added to the
                             context window
          Returns:
              Result containing response and run information
          """
          coro = self.run(*prompt, store_history=store_history)
          return self.run_task_sync(coro)
      

      stop async

      stop()
      

      Stop background execution if running.

      Source code in src/llmling_agent/delegation/base_team.py
      275
      276
      277
      278
      279
      280
      281
      async def stop(self):
          """Stop background execution if running."""
          if self._main_task and not self._main_task.done():
              self._main_task.cancel()
              await self._main_task
          self._main_task = None
          await self.cleanup_tasks()
      

      temporary_state async

      temporary_state(
          *,
          system_prompts: list[AnyPromptType] | None = None,
          replace_prompts: bool = False,
          tools: list[ToolType] | None = None,
          replace_tools: bool = False,
          history: list[AnyPromptType] | SessionQuery | None = None,
          replace_history: bool = False,
          pause_routing: bool = False,
          model: ModelType | None = None,
          provider: AgentProvider | None = None,
      ) -> AsyncIterator[Self]
      

      Temporarily modify state of all agents in the team.

      All agents in the team will enter their temporary state simultaneously.

      Parameters:

      Name Type Description Default
      system_prompts list[AnyPromptType] | None

      Temporary system prompts to use

      None
      replace_prompts bool

      Whether to replace existing prompts

      False
      tools list[ToolType] | None

      Temporary tools to make available

      None
      replace_tools bool

      Whether to replace existing tools

      False
      history list[AnyPromptType] | SessionQuery | None

      Conversation history (prompts or query)

      None
      replace_history bool

      Whether to replace existing history

      False
      pause_routing bool

      Whether to pause message routing

      False
      model ModelType | None

      Temporary model override

      None
      provider AgentProvider | None

      Temporary provider override

      None
      Source code in src/llmling_agent/delegation/base_team.py
      467
      468
      469
      470
      471
      472
      473
      474
      475
      476
      477
      478
      479
      480
      481
      482
      483
      484
      485
      486
      487
      488
      489
      490
      491
      492
      493
      494
      495
      496
      497
      498
      499
      500
      501
      502
      503
      504
      505
      506
      507
      508
      509
      510
      511
      512
      513
      514
      515
      516
      517
      518
      519
      520
      521
      @asynccontextmanager
      async def temporary_state(
          self,
          *,
          system_prompts: list[AnyPromptType] | None = None,
          replace_prompts: bool = False,
          tools: list[ToolType] | None = None,
          replace_tools: bool = False,
          history: list[AnyPromptType] | SessionQuery | None = None,
          replace_history: bool = False,
          pause_routing: bool = False,
          model: ModelType | None = None,
          provider: AgentProvider | None = None,
      ) -> AsyncIterator[Self]:
          """Temporarily modify state of all agents in the team.
      
          All agents in the team will enter their temporary state simultaneously.
      
          Args:
              system_prompts: Temporary system prompts to use
              replace_prompts: Whether to replace existing prompts
              tools: Temporary tools to make available
              replace_tools: Whether to replace existing tools
              history: Conversation history (prompts or query)
              replace_history: Whether to replace existing history
              pause_routing: Whether to pause message routing
              model: Temporary model override
              provider: Temporary provider override
          """
          # Get all agents (flattened) before entering context
          agents = list(self.iter_agents())
      
          async with AsyncExitStack() as stack:
              if pause_routing:
                  await stack.enter_async_context(self.connections.paused_routing())
              # Enter temporary state for all agents
              for agent in agents:
                  await stack.enter_async_context(
                      agent.temporary_state(
                          system_prompts=system_prompts,
                          replace_prompts=replace_prompts,
                          tools=tools,
                          replace_tools=replace_tools,
                          history=history,
                          replace_history=replace_history,
                          pause_routing=pause_routing,
                          model=model,
                          provider=provider,
                      )
                  )
              try:
                  yield self
              finally:
                  # AsyncExitStack will handle cleanup of all states
                  pass
      

      to_tool

      to_tool(*, name: str | None = None, description: str | None = None) -> Tool
      

      Create a tool from this agent.

      Parameters:

      Name Type Description Default
      name str | None

      Optional tool name override

      None
      description str | None

      Optional tool description override

      None
      Source code in src/llmling_agent/delegation/base_team.py
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      def to_tool(self, *, name: str | None = None, description: str | None = None) -> Tool:
          """Create a tool from this agent.
      
          Args:
              name: Optional tool name override
              description: Optional tool description override
          """
          tool_name = name or f"ask_{self.name}"
      
          async def wrapped_tool(prompt: str) -> TResult:
              result = await self.run(prompt)
              return result.data
      
          docstring = description or f"Get expert answer from node {self.name}"
          if self.description:
              docstring = f"{docstring}\n\n{self.description}"
      
          wrapped_tool.__doc__ = docstring
          wrapped_tool.__name__ = tool_name
      
          return Tool.from_callable(
              wrapped_tool,
              name_override=tool_name,
              description_override=docstring,
          )
      

      wait async

      wait() -> ChatMessage[Any] | None
      

      Wait for background execution to complete and return last message.

      Source code in src/llmling_agent/delegation/base_team.py
      283
      284
      285
      286
      287
      288
      289
      290
      291
      292
      293
      294
      295
      async def wait(self) -> ChatMessage[Any] | None:
          """Wait for background execution to complete and return last message."""
          if not self._main_task:
              msg = "No execution running"
              raise RuntimeError(msg)
          if self._infinite:
              msg = "Cannot wait on infinite execution"
              raise RuntimeError(msg)
          try:
              return await self._main_task
          finally:
              await self.cleanup_tasks()
              self._main_task = None
      

      Show source on GitHub