Skip to content

Team

Base classes

Name Children Inherits
BaseTeam
llmling_agent.delegation.base_team
Base class for Team and TeamRun.
Generic
typing
Abstract base class for generic types.

⋔ Inheritance diagram

graph TD
  94738714137696["team.Team"]
  94738714225408["base_team.BaseTeam"]
  94738709756976["messagenode.MessageNode"]
  94738714190864["messageemitter.MessageEmitter"]
  94738672763424["abc.ABC"]
  139965989001696["builtins.object"]
  94738672368384["typing.Generic"]
  94738714225408 --> 94738714137696
  94738709756976 --> 94738714225408
  94738714190864 --> 94738709756976
  94738672763424 --> 94738714190864
  139965989001696 --> 94738672763424
  94738672368384 --> 94738714190864
  139965989001696 --> 94738672368384
  94738672368384 --> 94738709756976
  94738672368384 --> 94738714225408
  94738672368384 --> 94738714137696

🛈 DocStrings

Bases: BaseTeam[TDeps, Any]

Group of agents that can execute together.

Source code in src/llmling_agent/delegation/team.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
class Team[TDeps](BaseTeam[TDeps, Any]):
    """Group of agents that can execute together."""

    async def execute(
        self,
        *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
        **kwargs: Any,
    ) -> TeamResponse:
        """Run all agents in parallel with monitoring."""
        from llmling_agent.talk.talk import Talk

        self._team_talk.clear()

        start_time = get_now()
        responses: list[AgentResponse[Any]] = []
        errors: dict[str, Exception] = {}
        final_prompt = list(prompts)
        if self.shared_prompt:
            final_prompt.insert(0, self.shared_prompt)
        combined_prompt = "\n".join([await to_prompt(p) for p in final_prompt])
        all_nodes = list(await self.pick_agents(combined_prompt))
        # Create Talk connections for monitoring this execution
        execution_talks: list[Talk[Any]] = []
        for node in all_nodes:
            talk = Talk[Any](
                node,
                [],  # No actual forwarding, just for tracking
                connection_type="run",
                queued=True,
                queue_strategy="latest",
            )
            execution_talks.append(talk)
            self._team_talk.append(talk)  # Add to base class's TeamTalk

        async def _run(node: MessageNode[TDeps, Any]):
            try:
                start = perf_counter()
                message = await node.run(*final_prompt, **kwargs)
                timing = perf_counter() - start
                r = AgentResponse(agent_name=node.name, message=message, timing=timing)
                responses.append(r)

                # Update talk stats for this agent
                talk = next(t for t in execution_talks if t.source == node)
                talk._stats.messages.append(message)

            except Exception as e:  # noqa: BLE001
                errors[node.name] = e

        # Run all agents in parallel
        await asyncio.gather(*[_run(node) for node in all_nodes])

        return TeamResponse(responses=responses, start_time=start_time, errors=errors)

    def __prompt__(self) -> str:
        """Format team info for prompts."""
        members = ", ".join(a.name for a in self.agents)
        desc = f" - {self.description}" if self.description else ""
        return f"Parallel Team '{self.name}'{desc}\nMembers: {members}"

    async def run_iter(
        self,
        *prompts: AnyPromptType,
        **kwargs: Any,
    ) -> AsyncIterator[ChatMessage[Any]]:
        """Yield messages as they arrive from parallel execution."""
        queue: asyncio.Queue[ChatMessage[Any] | None] = asyncio.Queue()
        failures: dict[str, Exception] = {}

        async def _run(node: MessageNode[TDeps, Any]):
            try:
                message = await node.run(*prompts, **kwargs)
                await queue.put(message)
            except Exception as e:
                logger.exception("Error executing node %s", node.name)
                failures[node.name] = e
                # Put None to maintain queue count
                await queue.put(None)

        # Get nodes to run
        combined_prompt = "\n".join([await to_prompt(p) for p in prompts])
        all_nodes = list(await self.pick_agents(combined_prompt))

        # Start all agents
        tasks = [asyncio.create_task(_run(n), name=f"run_{n.name}") for n in all_nodes]

        try:
            # Yield messages as they arrive
            for _ in all_nodes:
                if msg := await queue.get():
                    yield msg

            # If any failures occurred, raise error with details
            if failures:
                error_details = "\n".join(
                    f"- {name}: {error}" for name, error in failures.items()
                )
                error_msg = f"Some nodes failed to execute:\n{error_details}"
                raise RuntimeError(error_msg)

        finally:
            # Clean up any remaining tasks
            for task in tasks:
                if not task.done():
                    task.cancel()

    async def _run(
        self,
        *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
        wait_for_connections: bool | None = None,
        message_id: str | None = None,
        conversation_id: str | None = None,
        **kwargs: Any,
    ) -> ChatMessage[list[Any]]:
        """Run all agents in parallel and return combined message."""
        result: TeamResponse = await self.execute(*prompts, **kwargs)
        message_id = message_id or str(uuid4())
        return ChatMessage(
            content=[r.message.content for r in result if r.message],
            role="assistant",
            name=self.name,
            message_id=message_id,
            conversation_id=conversation_id,
            metadata={
                "agent_names": [r.agent_name for r in result],
                "errors": {name: str(error) for name, error in result.errors.items()},
                "start_time": result.start_time.isoformat(),
            },
        )

    async def run_stream(
        self,
        *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
        **kwargs: Any,
    ) -> AsyncIterator[
        tuple[MessageNode[Any, Any], AgentStreamEvent | StreamCompleteEvent]
    ]:
        """Stream responses from all team members in parallel.

        Args:
            prompts: Input prompts to process in parallel
            kwargs: Additional arguments passed to each agent

        Yields:
            Tuples of (agent, event) where agent is the Agent instance
            and event is the streaming event from that agent.
        """
        # Get nodes to run
        combined_prompt = "\n".join([await to_prompt(p) for p in prompts])
        all_nodes = list(await self.pick_agents(combined_prompt))

        # Create list of streams that yield (agent, event) tuples
        agent_streams = [
            normalize_stream_for_teams(agent, *prompts, **kwargs)
            for agent in all_nodes
            if hasattr(agent, "run_stream")
        ]

        # Merge all agent streams
        async for agent_event_tuple in as_generated(agent_streams):
            yield agent_event_tuple

    async def run_job[TJobResult](
        self,
        job: Job[TDeps, TJobResult],
        *,
        store_history: bool = True,
        include_agent_tools: bool = True,
    ) -> list[AgentResponse[TJobResult]]:
        """Execute a job across all team members in parallel.

        Args:
            job: Job configuration to execute
            store_history: Whether to add job execution to conversation history
            include_agent_tools: Whether to include agent's tools alongside job tools

        Returns:
            List of responses from all agents

        Raises:
            JobError: If job execution fails for any agent
            ValueError: If job configuration is invalid
        """
        from llmling_agent.agent import Agent, StructuredAgent
        from llmling_agent.tasks import JobError

        responses: list[AgentResponse[TJobResult]] = []
        errors: dict[str, Exception] = {}
        start_time = get_now()

        # Validate dependencies for all agents
        if job.required_dependency is not None:
            invalid_agents = [
                agent.name
                for agent in self.iter_agents()
                if not isinstance(agent.context.data, job.required_dependency)
            ]
            if invalid_agents:
                msg = (
                    f"Agents {', '.join(invalid_agents)} don't have required "
                    f"dependency type: {job.required_dependency}"
                )
                raise JobError(msg)

        try:
            # Load knowledge for all agents if provided
            if job.knowledge:
                # TODO: resources
                tools = [t.name for t in job.get_tools()]
                await self.distribute(content="", tools=tools)

            prompt = await job.get_prompt()

            async def _run(agent: MessageNode[TDeps, TJobResult]):
                assert isinstance(agent, Agent | StructuredAgent)
                try:
                    with agent.tools.temporary_tools(
                        job.get_tools(), exclusive=not include_agent_tools
                    ):
                        start = perf_counter()
                        resp = AgentResponse(
                            agent_name=agent.name,
                            message=await agent.run(prompt, store_history=store_history),  # pyright: ignore
                            timing=perf_counter() - start,
                        )
                        responses.append(resp)
                except Exception as e:  # noqa: BLE001
                    errors[agent.name] = e

            # Run job in parallel on all agents
            await asyncio.gather(*[_run(node) for node in self.agents])

            return TeamResponse(responses=responses, start_time=start_time, errors=errors)

        except Exception as e:
            msg = "Job execution failed"
            logger.exception(msg)
            raise JobError(msg) from e

__prompt__

__prompt__() -> str

Format team info for prompts.

Source code in src/llmling_agent/delegation/team.py
122
123
124
125
126
def __prompt__(self) -> str:
    """Format team info for prompts."""
    members = ", ".join(a.name for a in self.agents)
    desc = f" - {self.description}" if self.description else ""
    return f"Parallel Team '{self.name}'{desc}\nMembers: {members}"

execute async

execute(
    *prompts: AnyPromptType | Image | PathLike[str] | None, **kwargs: Any
) -> TeamResponse

Run all agents in parallel with monitoring.

Source code in src/llmling_agent/delegation/team.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
async def execute(
    self,
    *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
    **kwargs: Any,
) -> TeamResponse:
    """Run all agents in parallel with monitoring."""
    from llmling_agent.talk.talk import Talk

    self._team_talk.clear()

    start_time = get_now()
    responses: list[AgentResponse[Any]] = []
    errors: dict[str, Exception] = {}
    final_prompt = list(prompts)
    if self.shared_prompt:
        final_prompt.insert(0, self.shared_prompt)
    combined_prompt = "\n".join([await to_prompt(p) for p in final_prompt])
    all_nodes = list(await self.pick_agents(combined_prompt))
    # Create Talk connections for monitoring this execution
    execution_talks: list[Talk[Any]] = []
    for node in all_nodes:
        talk = Talk[Any](
            node,
            [],  # No actual forwarding, just for tracking
            connection_type="run",
            queued=True,
            queue_strategy="latest",
        )
        execution_talks.append(talk)
        self._team_talk.append(talk)  # Add to base class's TeamTalk

    async def _run(node: MessageNode[TDeps, Any]):
        try:
            start = perf_counter()
            message = await node.run(*final_prompt, **kwargs)
            timing = perf_counter() - start
            r = AgentResponse(agent_name=node.name, message=message, timing=timing)
            responses.append(r)

            # Update talk stats for this agent
            talk = next(t for t in execution_talks if t.source == node)
            talk._stats.messages.append(message)

        except Exception as e:  # noqa: BLE001
            errors[node.name] = e

    # Run all agents in parallel
    await asyncio.gather(*[_run(node) for node in all_nodes])

    return TeamResponse(responses=responses, start_time=start_time, errors=errors)

run_iter async

run_iter(*prompts: AnyPromptType, **kwargs: Any) -> AsyncIterator[ChatMessage[Any]]

Yield messages as they arrive from parallel execution.

Source code in src/llmling_agent/delegation/team.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
async def run_iter(
    self,
    *prompts: AnyPromptType,
    **kwargs: Any,
) -> AsyncIterator[ChatMessage[Any]]:
    """Yield messages as they arrive from parallel execution."""
    queue: asyncio.Queue[ChatMessage[Any] | None] = asyncio.Queue()
    failures: dict[str, Exception] = {}

    async def _run(node: MessageNode[TDeps, Any]):
        try:
            message = await node.run(*prompts, **kwargs)
            await queue.put(message)
        except Exception as e:
            logger.exception("Error executing node %s", node.name)
            failures[node.name] = e
            # Put None to maintain queue count
            await queue.put(None)

    # Get nodes to run
    combined_prompt = "\n".join([await to_prompt(p) for p in prompts])
    all_nodes = list(await self.pick_agents(combined_prompt))

    # Start all agents
    tasks = [asyncio.create_task(_run(n), name=f"run_{n.name}") for n in all_nodes]

    try:
        # Yield messages as they arrive
        for _ in all_nodes:
            if msg := await queue.get():
                yield msg

        # If any failures occurred, raise error with details
        if failures:
            error_details = "\n".join(
                f"- {name}: {error}" for name, error in failures.items()
            )
            error_msg = f"Some nodes failed to execute:\n{error_details}"
            raise RuntimeError(error_msg)

    finally:
        # Clean up any remaining tasks
        for task in tasks:
            if not task.done():
                task.cancel()

run_job async

run_job(
    job: Job[TDeps, TJobResult],
    *,
    store_history: bool = True,
    include_agent_tools: bool = True,
) -> list[AgentResponse[TJobResult]]

Execute a job across all team members in parallel.

Parameters:

Name Type Description Default
job Job[TDeps, TJobResult]

Job configuration to execute

required
store_history bool

Whether to add job execution to conversation history

True
include_agent_tools bool

Whether to include agent's tools alongside job tools

True

Returns:

Type Description
list[AgentResponse[TJobResult]]

List of responses from all agents

Raises:

Type Description
JobError

If job execution fails for any agent

ValueError

If job configuration is invalid

Source code in src/llmling_agent/delegation/team.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
async def run_job[TJobResult](
    self,
    job: Job[TDeps, TJobResult],
    *,
    store_history: bool = True,
    include_agent_tools: bool = True,
) -> list[AgentResponse[TJobResult]]:
    """Execute a job across all team members in parallel.

    Args:
        job: Job configuration to execute
        store_history: Whether to add job execution to conversation history
        include_agent_tools: Whether to include agent's tools alongside job tools

    Returns:
        List of responses from all agents

    Raises:
        JobError: If job execution fails for any agent
        ValueError: If job configuration is invalid
    """
    from llmling_agent.agent import Agent, StructuredAgent
    from llmling_agent.tasks import JobError

    responses: list[AgentResponse[TJobResult]] = []
    errors: dict[str, Exception] = {}
    start_time = get_now()

    # Validate dependencies for all agents
    if job.required_dependency is not None:
        invalid_agents = [
            agent.name
            for agent in self.iter_agents()
            if not isinstance(agent.context.data, job.required_dependency)
        ]
        if invalid_agents:
            msg = (
                f"Agents {', '.join(invalid_agents)} don't have required "
                f"dependency type: {job.required_dependency}"
            )
            raise JobError(msg)

    try:
        # Load knowledge for all agents if provided
        if job.knowledge:
            # TODO: resources
            tools = [t.name for t in job.get_tools()]
            await self.distribute(content="", tools=tools)

        prompt = await job.get_prompt()

        async def _run(agent: MessageNode[TDeps, TJobResult]):
            assert isinstance(agent, Agent | StructuredAgent)
            try:
                with agent.tools.temporary_tools(
                    job.get_tools(), exclusive=not include_agent_tools
                ):
                    start = perf_counter()
                    resp = AgentResponse(
                        agent_name=agent.name,
                        message=await agent.run(prompt, store_history=store_history),  # pyright: ignore
                        timing=perf_counter() - start,
                    )
                    responses.append(resp)
            except Exception as e:  # noqa: BLE001
                errors[agent.name] = e

        # Run job in parallel on all agents
        await asyncio.gather(*[_run(node) for node in self.agents])

        return TeamResponse(responses=responses, start_time=start_time, errors=errors)

    except Exception as e:
        msg = "Job execution failed"
        logger.exception(msg)
        raise JobError(msg) from e

run_stream async

run_stream(
    *prompts: AnyPromptType | Image | PathLike[str], **kwargs: Any
) -> AsyncIterator[tuple[MessageNode[Any, Any], AgentStreamEvent | StreamCompleteEvent]]

Stream responses from all team members in parallel.

Parameters:

Name Type Description Default
prompts AnyPromptType | Image | PathLike[str]

Input prompts to process in parallel

()
kwargs Any

Additional arguments passed to each agent

{}

Yields:

Type Description
AsyncIterator[tuple[MessageNode[Any, Any], AgentStreamEvent | StreamCompleteEvent]]

Tuples of (agent, event) where agent is the Agent instance

AsyncIterator[tuple[MessageNode[Any, Any], AgentStreamEvent | StreamCompleteEvent]]

and event is the streaming event from that agent.

Source code in src/llmling_agent/delegation/team.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
async def run_stream(
    self,
    *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
    **kwargs: Any,
) -> AsyncIterator[
    tuple[MessageNode[Any, Any], AgentStreamEvent | StreamCompleteEvent]
]:
    """Stream responses from all team members in parallel.

    Args:
        prompts: Input prompts to process in parallel
        kwargs: Additional arguments passed to each agent

    Yields:
        Tuples of (agent, event) where agent is the Agent instance
        and event is the streaming event from that agent.
    """
    # Get nodes to run
    combined_prompt = "\n".join([await to_prompt(p) for p in prompts])
    all_nodes = list(await self.pick_agents(combined_prompt))

    # Create list of streams that yield (agent, event) tuples
    agent_streams = [
        normalize_stream_for_teams(agent, *prompts, **kwargs)
        for agent in all_nodes
        if hasattr(agent, "run_stream")
    ]

    # Merge all agent streams
    async for agent_event_tuple in as_generated(agent_streams):
        yield agent_event_tuple

Show source on GitHub