Skip to content

team

Class info

Classes

Name Children Inherits
AgentResponse
llmling_agent.messaging.messages
Result from an agent's execution.
    BaseTeam
    llmling_agent.delegation.base_team
    Base class for Team and TeamRun.
    ChatMessage
    llmling_agent.messaging.messages
    Common message format for all UI types.
      Team
      llmling_agent.delegation.team
      Group of agents that can execute together.
        TeamResponse
        llmling_agent.messaging.messages
        Results from a team execution.

          🛈 DocStrings

          Team

          Bases: BaseTeam[TDeps, Any]

          Group of agents that can execute together.

          Source code in src/llmling_agent/delegation/team.py
           34
           35
           36
           37
           38
           39
           40
           41
           42
           43
           44
           45
           46
           47
           48
           49
           50
           51
           52
           53
           54
           55
           56
           57
           58
           59
           60
           61
           62
           63
           64
           65
           66
           67
           68
           69
           70
           71
           72
           73
           74
           75
           76
           77
           78
           79
           80
           81
           82
           83
           84
           85
           86
           87
           88
           89
           90
           91
           92
           93
           94
           95
           96
           97
           98
           99
          100
          101
          102
          103
          104
          105
          106
          107
          108
          109
          110
          111
          112
          113
          114
          115
          116
          117
          118
          119
          120
          121
          122
          123
          124
          125
          126
          127
          128
          129
          130
          131
          132
          133
          134
          135
          136
          137
          138
          139
          140
          141
          142
          143
          144
          145
          146
          147
          148
          149
          150
          151
          152
          153
          154
          155
          156
          157
          158
          159
          160
          161
          162
          163
          164
          165
          166
          167
          168
          169
          170
          171
          172
          173
          174
          175
          176
          177
          178
          179
          180
          181
          182
          183
          184
          185
          186
          187
          188
          189
          190
          191
          192
          193
          194
          195
          196
          197
          198
          199
          200
          201
          202
          203
          204
          205
          206
          207
          208
          209
          210
          211
          212
          213
          214
          215
          216
          217
          218
          219
          220
          221
          222
          223
          224
          225
          226
          227
          228
          229
          230
          231
          232
          233
          234
          235
          236
          237
          238
          239
          class Team[TDeps](BaseTeam[TDeps, Any]):
              """Group of agents that can execute together."""
          
              async def execute(
                  self,
                  *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
                  **kwargs: Any,
              ) -> TeamResponse:
                  """Run all agents in parallel with monitoring."""
                  from llmling_agent.talk.talk import Talk
          
                  self._team_talk.clear()
          
                  start_time = datetime.now()
                  responses: list[AgentResponse[Any]] = []
                  errors: dict[str, Exception] = {}
                  final_prompt = list(prompts)
                  if self.shared_prompt:
                      final_prompt.insert(0, self.shared_prompt)
                  combined_prompt = "\n".join([await to_prompt(p) for p in final_prompt])
                  all_nodes = list(await self.pick_agents(combined_prompt))
                  # Create Talk connections for monitoring this execution
                  execution_talks: list[Talk[Any]] = []
                  for node in all_nodes:
                      talk = Talk[Any](
                          node,
                          [],  # No actual forwarding, just for tracking
                          connection_type="run",
                          queued=True,
                          queue_strategy="latest",
                      )
                      execution_talks.append(talk)
                      self._team_talk.append(talk)  # Add to base class's TeamTalk
          
                  async def _run(node: MessageNode[TDeps, Any]):
                      try:
                          start = perf_counter()
                          message = await node.run(*final_prompt, **kwargs)
                          timing = perf_counter() - start
                          r = AgentResponse(agent_name=node.name, message=message, timing=timing)
                          responses.append(r)
          
                          # Update talk stats for this agent
                          talk = next(t for t in execution_talks if t.source == node)
                          talk._stats.messages.append(message)
          
                      except Exception as e:  # noqa: BLE001
                          errors[node.name] = e
          
                  # Run all agents in parallel
                  await asyncio.gather(*[_run(node) for node in all_nodes])
          
                  return TeamResponse(responses=responses, start_time=start_time, errors=errors)
          
              def __prompt__(self) -> str:
                  """Format team info for prompts."""
                  members = ", ".join(a.name for a in self.agents)
                  desc = f" - {self.description}" if self.description else ""
                  return f"Parallel Team '{self.name}'{desc}\nMembers: {members}"
          
              async def run_iter(
                  self,
                  *prompts: AnyPromptType,
                  **kwargs: Any,
              ) -> AsyncIterator[ChatMessage[Any]]:
                  """Yield messages as they arrive from parallel execution."""
                  queue: asyncio.Queue[ChatMessage[Any] | None] = asyncio.Queue()
                  failures: dict[str, Exception] = {}
          
                  async def _run(node: MessageNode[TDeps, Any]):
                      try:
                          message = await node.run(*prompts, **kwargs)
                          await queue.put(message)
                      except Exception as e:
                          logger.exception("Error executing node %s", node.name)
                          failures[node.name] = e
                          # Put None to maintain queue count
                          await queue.put(None)
          
                  # Get nodes to run
                  combined_prompt = "\n".join([await to_prompt(p) for p in prompts])
                  all_nodes = list(await self.pick_agents(combined_prompt))
          
                  # Start all agents
                  tasks = [asyncio.create_task(_run(n), name=f"run_{n.name}") for n in all_nodes]
          
                  try:
                      # Yield messages as they arrive
                      for _ in all_nodes:
                          if msg := await queue.get():
                              yield msg
          
                      # If any failures occurred, raise error with details
                      if failures:
                          error_details = "\n".join(
                              f"- {name}: {error}" for name, error in failures.items()
                          )
                          error_msg = f"Some nodes failed to execute:\n{error_details}"
                          raise RuntimeError(error_msg)
          
                  finally:
                      # Clean up any remaining tasks
                      for task in tasks:
                          if not task.done():
                              task.cancel()
          
              async def _run(
                  self,
                  *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
                  wait_for_connections: bool | None = None,
                  message_id: str | None = None,
                  conversation_id: str | None = None,
                  **kwargs: Any,
              ) -> ChatMessage[list[Any]]:
                  """Run all agents in parallel and return combined message."""
                  result: TeamResponse = await self.execute(*prompts, **kwargs)
                  message_id = message_id or str(uuid4())
                  return ChatMessage(
                      content=[r.message.content for r in result if r.message],
                      role="assistant",
                      name=self.name,
                      message_id=message_id,
                      conversation_id=conversation_id,
                      metadata={
                          "agent_names": [r.agent_name for r in result],
                          "errors": {name: str(error) for name, error in result.errors.items()},
                          "start_time": result.start_time.isoformat(),
                      },
                  )
          
              async def run_job[TJobResult](
                  self,
                  job: Job[TDeps, TJobResult],
                  *,
                  store_history: bool = True,
                  include_agent_tools: bool = True,
              ) -> list[AgentResponse[TJobResult]]:
                  """Execute a job across all team members in parallel.
          
                  Args:
                      job: Job configuration to execute
                      store_history: Whether to add job execution to conversation history
                      include_agent_tools: Whether to include agent's tools alongside job tools
          
                  Returns:
                      List of responses from all agents
          
                  Raises:
                      JobError: If job execution fails for any agent
                      ValueError: If job configuration is invalid
                  """
                  from llmling_agent.agent import Agent, StructuredAgent
                  from llmling_agent.tasks import JobError
          
                  responses: list[AgentResponse[TJobResult]] = []
                  errors: dict[str, Exception] = {}
                  start_time = datetime.now()
          
                  # Validate dependencies for all agents
                  if job.required_dependency is not None:
                      invalid_agents = [
                          agent.name
                          for agent in self.iter_agents()
                          if not isinstance(agent.context.data, job.required_dependency)
                      ]
                      if invalid_agents:
                          msg = (
                              f"Agents {', '.join(invalid_agents)} don't have required "
                              f"dependency type: {job.required_dependency}"
                          )
                          raise JobError(msg)
          
                  try:
                      # Load knowledge for all agents if provided
                      if job.knowledge:
                          # TODO: resources
                          tools = [t.name for t in job.get_tools()]
                          await self.distribute(content="", tools=tools)
          
                      prompt = await job.get_prompt()
          
                      async def _run(agent: MessageNode[TDeps, TJobResult]):
                          assert isinstance(agent, Agent | StructuredAgent)
                          try:
                              with agent.tools.temporary_tools(
                                  job.get_tools(), exclusive=not include_agent_tools
                              ):
                                  start = perf_counter()
                                  resp = AgentResponse(
                                      agent_name=agent.name,
                                      message=await agent.run(prompt, store_history=store_history),  # pyright: ignore
                                      timing=perf_counter() - start,
                                  )
                                  responses.append(resp)
                          except Exception as e:  # noqa: BLE001
                              errors[agent.name] = e
          
                      # Run job in parallel on all agents
                      await asyncio.gather(*[_run(node) for node in self.agents])
          
                      return TeamResponse(responses=responses, start_time=start_time, errors=errors)
          
                  except Exception as e:
                      msg = "Job execution failed"
                      logger.exception(msg)
                      raise JobError(msg) from e
          

          __prompt__

          __prompt__() -> str
          

          Format team info for prompts.

          Source code in src/llmling_agent/delegation/team.py
          88
          89
          90
          91
          92
          def __prompt__(self) -> str:
              """Format team info for prompts."""
              members = ", ".join(a.name for a in self.agents)
              desc = f" - {self.description}" if self.description else ""
              return f"Parallel Team '{self.name}'{desc}\nMembers: {members}"
          

          _run async

          _run(
              *prompts: AnyPromptType | Image | PathLike[str] | None,
              wait_for_connections: bool | None = None,
              message_id: str | None = None,
              conversation_id: str | None = None,
              **kwargs: Any,
          ) -> ChatMessage[list[Any]]
          

          Run all agents in parallel and return combined message.

          Source code in src/llmling_agent/delegation/team.py
          140
          141
          142
          143
          144
          145
          146
          147
          148
          149
          150
          151
          152
          153
          154
          155
          156
          157
          158
          159
          160
          161
          162
          async def _run(
              self,
              *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
              wait_for_connections: bool | None = None,
              message_id: str | None = None,
              conversation_id: str | None = None,
              **kwargs: Any,
          ) -> ChatMessage[list[Any]]:
              """Run all agents in parallel and return combined message."""
              result: TeamResponse = await self.execute(*prompts, **kwargs)
              message_id = message_id or str(uuid4())
              return ChatMessage(
                  content=[r.message.content for r in result if r.message],
                  role="assistant",
                  name=self.name,
                  message_id=message_id,
                  conversation_id=conversation_id,
                  metadata={
                      "agent_names": [r.agent_name for r in result],
                      "errors": {name: str(error) for name, error in result.errors.items()},
                      "start_time": result.start_time.isoformat(),
                  },
              )
          

          execute async

          execute(
              *prompts: AnyPromptType | Image | PathLike[str] | None, **kwargs: Any
          ) -> TeamResponse
          

          Run all agents in parallel with monitoring.

          Source code in src/llmling_agent/delegation/team.py
          37
          38
          39
          40
          41
          42
          43
          44
          45
          46
          47
          48
          49
          50
          51
          52
          53
          54
          55
          56
          57
          58
          59
          60
          61
          62
          63
          64
          65
          66
          67
          68
          69
          70
          71
          72
          73
          74
          75
          76
          77
          78
          79
          80
          81
          82
          83
          84
          85
          86
          async def execute(
              self,
              *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
              **kwargs: Any,
          ) -> TeamResponse:
              """Run all agents in parallel with monitoring."""
              from llmling_agent.talk.talk import Talk
          
              self._team_talk.clear()
          
              start_time = datetime.now()
              responses: list[AgentResponse[Any]] = []
              errors: dict[str, Exception] = {}
              final_prompt = list(prompts)
              if self.shared_prompt:
                  final_prompt.insert(0, self.shared_prompt)
              combined_prompt = "\n".join([await to_prompt(p) for p in final_prompt])
              all_nodes = list(await self.pick_agents(combined_prompt))
              # Create Talk connections for monitoring this execution
              execution_talks: list[Talk[Any]] = []
              for node in all_nodes:
                  talk = Talk[Any](
                      node,
                      [],  # No actual forwarding, just for tracking
                      connection_type="run",
                      queued=True,
                      queue_strategy="latest",
                  )
                  execution_talks.append(talk)
                  self._team_talk.append(talk)  # Add to base class's TeamTalk
          
              async def _run(node: MessageNode[TDeps, Any]):
                  try:
                      start = perf_counter()
                      message = await node.run(*final_prompt, **kwargs)
                      timing = perf_counter() - start
                      r = AgentResponse(agent_name=node.name, message=message, timing=timing)
                      responses.append(r)
          
                      # Update talk stats for this agent
                      talk = next(t for t in execution_talks if t.source == node)
                      talk._stats.messages.append(message)
          
                  except Exception as e:  # noqa: BLE001
                      errors[node.name] = e
          
              # Run all agents in parallel
              await asyncio.gather(*[_run(node) for node in all_nodes])
          
              return TeamResponse(responses=responses, start_time=start_time, errors=errors)
          

          run_iter async

          run_iter(*prompts: AnyPromptType, **kwargs: Any) -> AsyncIterator[ChatMessage[Any]]
          

          Yield messages as they arrive from parallel execution.

          Source code in src/llmling_agent/delegation/team.py
           94
           95
           96
           97
           98
           99
          100
          101
          102
          103
          104
          105
          106
          107
          108
          109
          110
          111
          112
          113
          114
          115
          116
          117
          118
          119
          120
          121
          122
          123
          124
          125
          126
          127
          128
          129
          130
          131
          132
          133
          134
          135
          136
          137
          138
          async def run_iter(
              self,
              *prompts: AnyPromptType,
              **kwargs: Any,
          ) -> AsyncIterator[ChatMessage[Any]]:
              """Yield messages as they arrive from parallel execution."""
              queue: asyncio.Queue[ChatMessage[Any] | None] = asyncio.Queue()
              failures: dict[str, Exception] = {}
          
              async def _run(node: MessageNode[TDeps, Any]):
                  try:
                      message = await node.run(*prompts, **kwargs)
                      await queue.put(message)
                  except Exception as e:
                      logger.exception("Error executing node %s", node.name)
                      failures[node.name] = e
                      # Put None to maintain queue count
                      await queue.put(None)
          
              # Get nodes to run
              combined_prompt = "\n".join([await to_prompt(p) for p in prompts])
              all_nodes = list(await self.pick_agents(combined_prompt))
          
              # Start all agents
              tasks = [asyncio.create_task(_run(n), name=f"run_{n.name}") for n in all_nodes]
          
              try:
                  # Yield messages as they arrive
                  for _ in all_nodes:
                      if msg := await queue.get():
                          yield msg
          
                  # If any failures occurred, raise error with details
                  if failures:
                      error_details = "\n".join(
                          f"- {name}: {error}" for name, error in failures.items()
                      )
                      error_msg = f"Some nodes failed to execute:\n{error_details}"
                      raise RuntimeError(error_msg)
          
              finally:
                  # Clean up any remaining tasks
                  for task in tasks:
                      if not task.done():
                          task.cancel()
          

          run_job async

          run_job(
              job: Job[TDeps, TJobResult],
              *,
              store_history: bool = True,
              include_agent_tools: bool = True,
          ) -> list[AgentResponse[TJobResult]]
          

          Execute a job across all team members in parallel.

          Parameters:

          Name Type Description Default
          job Job[TDeps, TJobResult]

          Job configuration to execute

          required
          store_history bool

          Whether to add job execution to conversation history

          True
          include_agent_tools bool

          Whether to include agent's tools alongside job tools

          True

          Returns:

          Type Description
          list[AgentResponse[TJobResult]]

          List of responses from all agents

          Raises:

          Type Description
          JobError

          If job execution fails for any agent

          ValueError

          If job configuration is invalid

          Source code in src/llmling_agent/delegation/team.py
          164
          165
          166
          167
          168
          169
          170
          171
          172
          173
          174
          175
          176
          177
          178
          179
          180
          181
          182
          183
          184
          185
          186
          187
          188
          189
          190
          191
          192
          193
          194
          195
          196
          197
          198
          199
          200
          201
          202
          203
          204
          205
          206
          207
          208
          209
          210
          211
          212
          213
          214
          215
          216
          217
          218
          219
          220
          221
          222
          223
          224
          225
          226
          227
          228
          229
          230
          231
          232
          233
          234
          235
          236
          237
          238
          239
          async def run_job[TJobResult](
              self,
              job: Job[TDeps, TJobResult],
              *,
              store_history: bool = True,
              include_agent_tools: bool = True,
          ) -> list[AgentResponse[TJobResult]]:
              """Execute a job across all team members in parallel.
          
              Args:
                  job: Job configuration to execute
                  store_history: Whether to add job execution to conversation history
                  include_agent_tools: Whether to include agent's tools alongside job tools
          
              Returns:
                  List of responses from all agents
          
              Raises:
                  JobError: If job execution fails for any agent
                  ValueError: If job configuration is invalid
              """
              from llmling_agent.agent import Agent, StructuredAgent
              from llmling_agent.tasks import JobError
          
              responses: list[AgentResponse[TJobResult]] = []
              errors: dict[str, Exception] = {}
              start_time = datetime.now()
          
              # Validate dependencies for all agents
              if job.required_dependency is not None:
                  invalid_agents = [
                      agent.name
                      for agent in self.iter_agents()
                      if not isinstance(agent.context.data, job.required_dependency)
                  ]
                  if invalid_agents:
                      msg = (
                          f"Agents {', '.join(invalid_agents)} don't have required "
                          f"dependency type: {job.required_dependency}"
                      )
                      raise JobError(msg)
          
              try:
                  # Load knowledge for all agents if provided
                  if job.knowledge:
                      # TODO: resources
                      tools = [t.name for t in job.get_tools()]
                      await self.distribute(content="", tools=tools)
          
                  prompt = await job.get_prompt()
          
                  async def _run(agent: MessageNode[TDeps, TJobResult]):
                      assert isinstance(agent, Agent | StructuredAgent)
                      try:
                          with agent.tools.temporary_tools(
                              job.get_tools(), exclusive=not include_agent_tools
                          ):
                              start = perf_counter()
                              resp = AgentResponse(
                                  agent_name=agent.name,
                                  message=await agent.run(prompt, store_history=store_history),  # pyright: ignore
                                  timing=perf_counter() - start,
                              )
                              responses.append(resp)
                      except Exception as e:  # noqa: BLE001
                          errors[agent.name] = e
          
                  # Run job in parallel on all agents
                  await asyncio.gather(*[_run(node) for node in self.agents])
          
                  return TeamResponse(responses=responses, start_time=start_time, errors=errors)
          
              except Exception as e:
                  msg = "Job execution failed"
                  logger.exception(msg)
                  raise JobError(msg) from e