Skip to content

TeamRun

Base classes

Name Children Inherits
BaseTeam
llmling_agent.delegation.base_team
Base class for Team and TeamRun.
Generic
typing
Abstract base class for generic types.

⋔ Inheritance diagram

graph TD
  94890204217552["teamrun.TeamRun"]
  94890203738848["base_team.BaseTeam"]
  94890204255632["messagenode.MessageNode"]
  94890199949904["messageemitter.MessageEmitter"]
  94890161476128["abc.ABC"]
  139970493684192["builtins.object"]
  94890161081088["typing.Generic"]
  94890203738848 --> 94890204217552
  94890204255632 --> 94890203738848
  94890199949904 --> 94890204255632
  94890161476128 --> 94890199949904
  139970493684192 --> 94890161476128
  94890161081088 --> 94890199949904
  139970493684192 --> 94890161081088
  94890161081088 --> 94890204255632
  94890161081088 --> 94890203738848
  94890161081088 --> 94890204217552

🛈 DocStrings

Bases: BaseTeam[TDeps, TResult]

Handles team operations with monitoring.

Source code in src/llmling_agent/delegation/teamrun.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
class TeamRun[TDeps, TResult](BaseTeam[TDeps, TResult]):
    """Handles team operations with monitoring."""

    @overload  # validator set: it defines the output
    def __init__(
        self,
        agents: Sequence[MessageNode[TDeps, Any]],
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        validator: MessageNode[Any, TResult],
        picker: Agent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
    ): ...

    @overload
    def __init__(  # no validator, but all nodes same output type.
        self,
        agents: Sequence[MessageNode[TDeps, TResult]],
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        validator: None = None,
        picker: Agent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
    ): ...

    @overload
    def __init__(
        self,
        agents: Sequence[MessageNode[TDeps, Any]],
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        validator: MessageNode[Any, TResult] | None = None,
        picker: Agent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
    ): ...

    def __init__(
        self,
        agents: Sequence[MessageNode[TDeps, Any]],
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        validator: MessageNode[Any, TResult] | None = None,
        picker: Agent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
        # result_mode: ResultMode = "last",
    ):
        super().__init__(
            agents,
            name=name,
            description=description,
            shared_prompt=shared_prompt,
            picker=picker,
            num_picks=num_picks,
            pick_prompt=pick_prompt,
        )
        self.validator = validator
        self.result_mode = "last"

    def __prompt__(self) -> str:
        """Format team info for prompts."""
        members = " -> ".join(a.name for a in self.agents)
        desc = f" - {self.description}" if self.description else ""
        return f"Sequential Team {self.name!r}{desc}\nPipeline: {members}"

    async def _run(
        self,
        *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
        wait_for_connections: bool | None = None,
        message_id: str | None = None,
        conversation_id: str | None = None,
        **kwargs: Any,
    ) -> ChatMessage[TResult]:
        """Run agents sequentially and return combined message.

        This message wraps execute and extracts the ChatMessage in order to fulfill
        the "message protocol".
        """
        message_id = message_id or str(uuid4())

        result = await self.execute(*prompts, **kwargs)
        all_messages = [r.message for r in result if r.message]
        assert all_messages, "Error during execution, returned None for TeamRun"
        # Determine content based on mode
        match self.result_mode:
            case "last":
                content = all_messages[-1].content
            # case "concat":
            #     content = "\n".join(msg.format() for msg in all_messages)
            case _:
                msg = f"Invalid result mode: {self.result_mode}"
                raise ValueError(msg)

        return ChatMessage(
            content=content,
            role="assistant",
            name=self.name,
            associated_messages=all_messages,
            message_id=message_id,
            conversation_id=conversation_id,
            metadata={
                "execution_order": [r.agent_name for r in result],
                "start_time": result.start_time.isoformat(),
                "errors": {name: str(error) for name, error in result.errors.items()},
            },
        )

    async def execute(
        self,
        *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
        **kwargs: Any,
    ) -> TeamResponse[TResult]:
        """Start execution with optional monitoring."""
        self._team_talk.clear()
        start_time = get_now()
        final_prompt = list(prompts)
        if self.shared_prompt:
            final_prompt.insert(0, self.shared_prompt)

        responses = [
            i
            async for i in self.execute_iter(*final_prompt)
            if isinstance(i, AgentResponse)
        ]
        return TeamResponse(responses, start_time)

    async def run_iter(
        self,
        *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
        **kwargs: Any,
    ) -> AsyncIterator[ChatMessage[Any]]:
        """Yield messages from the execution chain."""
        async for item in self.execute_iter(*prompts, **kwargs):
            match item:
                case AgentResponse():
                    if item.message:
                        yield item.message
                case Talk():
                    pass

    async def execute_iter(
        self,
        *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str],
        **kwargs: Any,
    ) -> AsyncIterator[Talk[Any] | AgentResponse[Any]]:
        from toprompt import to_prompt

        connections: list[Talk[Any]] = []
        try:
            combined_prompt = "\n".join([await to_prompt(p) for p in prompt])
            all_nodes = list(await self.pick_agents(combined_prompt))
            if self.validator:
                all_nodes.append(self.validator)
            first = all_nodes[0]
            connections = [s.connect_to(t, queued=True) for s, t in pairwise(all_nodes)]
            for conn in connections:
                self._team_talk.append(conn)

            # First agent
            start = perf_counter()
            message = await first.run(*prompt, **kwargs)
            timing = perf_counter() - start
            response = AgentResponse[Any](first.name, message=message, timing=timing)
            yield response

            # Process through chain
            for connection in connections:
                target = connection.targets[0]
                target_name = target.name
                yield connection

                # Let errors propagate - they break the chain
                start = perf_counter()
                messages = await connection.trigger()

                if target == all_nodes[-1]:
                    last_talk = Talk[Any](target, [], connection_type="run")
                    if response.message:
                        last_talk.stats.messages.append(response.message)
                    self._team_talk.append(last_talk)

                timing = perf_counter() - start
                msg = messages[0]
                response = AgentResponse[Any](target_name, message=msg, timing=timing)
                yield response

        finally:  # Always clean up connections
            for connection in connections:
                connection.disconnect()

    async def run_stream(
        self,
        *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
        require_all: bool = True,
        **kwargs: Any,
    ) -> AsyncIterator[
        tuple[MessageNode[Any, Any], AgentStreamEvent | StreamCompleteEvent]
    ]:
        """Stream responses through the chain of team members.

        Args:
            prompts: Input prompts to process through the chain
            require_all: If True, fail if any agent fails. If False,
                         continue with remaining agents.
            kwargs: Additional arguments passed to each agent

        Yields:
            Tuples of (agent, event) where agent is the Agent instance
            and event is the streaming event.
        """
        from pydantic_ai import PartDeltaEvent, TextPartDelta

        from llmling_agent.agent.agent import StreamCompleteEvent

        current_message = prompts
        collected_content = []

        for agent in self.agents:
            try:
                agent_content = []

                # Use wrapper to normalize all streaming nodes to (agent, event) tuples
                def _raise_streaming_error(agent=agent):
                    msg = f"Agent {agent.name} does not support streaming"
                    raise ValueError(msg)  # noqa: TRY301

                if hasattr(agent, "run_stream"):
                    stream = normalize_stream_for_teams(agent, *current_message, **kwargs)
                else:
                    _raise_streaming_error()

                async for agent_event_tuple in stream:
                    actual_agent, event = agent_event_tuple
                    match event:
                        case PartDeltaEvent(delta=TextPartDelta(content_delta=delta)):
                            agent_content.append(delta)
                            collected_content.append(delta)
                            yield (actual_agent, event)  # Yield tuple with agent context
                        case StreamCompleteEvent(message=message):
                            # Use complete response as input for next agent
                            current_message = (message.content,)
                            yield (actual_agent, event)  # Yield tuple with agent context
                        case _:
                            yield (actual_agent, event)  # Yield tuple with agent context

            except Exception as e:
                if require_all:
                    msg = f"Chain broken at {agent.name}: {e}"
                    logger.exception(msg)
                    raise ValueError(msg) from e
                logger.warning("Chain handler failed", name=agent.name, error=e)

__prompt__

__prompt__() -> str

Format team info for prompts.

Source code in src/llmling_agent/delegation/teamrun.py
130
131
132
133
134
def __prompt__(self) -> str:
    """Format team info for prompts."""
    members = " -> ".join(a.name for a in self.agents)
    desc = f" - {self.description}" if self.description else ""
    return f"Sequential Team {self.name!r}{desc}\nPipeline: {members}"

execute async

execute(
    *prompts: AnyPromptType | Image | PathLike[str] | None, **kwargs: Any
) -> TeamResponse[TResult]

Start execution with optional monitoring.

Source code in src/llmling_agent/delegation/teamrun.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
async def execute(
    self,
    *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
    **kwargs: Any,
) -> TeamResponse[TResult]:
    """Start execution with optional monitoring."""
    self._team_talk.clear()
    start_time = get_now()
    final_prompt = list(prompts)
    if self.shared_prompt:
        final_prompt.insert(0, self.shared_prompt)

    responses = [
        i
        async for i in self.execute_iter(*final_prompt)
        if isinstance(i, AgentResponse)
    ]
    return TeamResponse(responses, start_time)

run_iter async

run_iter(
    *prompts: AnyPromptType | Image | PathLike[str], **kwargs: Any
) -> AsyncIterator[ChatMessage[Any]]

Yield messages from the execution chain.

Source code in src/llmling_agent/delegation/teamrun.py
197
198
199
200
201
202
203
204
205
206
207
208
209
async def run_iter(
    self,
    *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
    **kwargs: Any,
) -> AsyncIterator[ChatMessage[Any]]:
    """Yield messages from the execution chain."""
    async for item in self.execute_iter(*prompts, **kwargs):
        match item:
            case AgentResponse():
                if item.message:
                    yield item.message
            case Talk():
                pass

run_stream async

run_stream(
    *prompts: AnyPromptType | Image | PathLike[str],
    require_all: bool = True,
    **kwargs: Any,
) -> AsyncIterator[tuple[MessageNode[Any, Any], AgentStreamEvent | StreamCompleteEvent]]

Stream responses through the chain of team members.

Parameters:

Name Type Description Default
prompts AnyPromptType | Image | PathLike[str]

Input prompts to process through the chain

()
require_all bool

If True, fail if any agent fails. If False, continue with remaining agents.

True
kwargs Any

Additional arguments passed to each agent

{}

Yields:

Type Description
AsyncIterator[tuple[MessageNode[Any, Any], AgentStreamEvent | StreamCompleteEvent]]

Tuples of (agent, event) where agent is the Agent instance

AsyncIterator[tuple[MessageNode[Any, Any], AgentStreamEvent | StreamCompleteEvent]]

and event is the streaming event.

Source code in src/llmling_agent/delegation/teamrun.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
async def run_stream(
    self,
    *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
    require_all: bool = True,
    **kwargs: Any,
) -> AsyncIterator[
    tuple[MessageNode[Any, Any], AgentStreamEvent | StreamCompleteEvent]
]:
    """Stream responses through the chain of team members.

    Args:
        prompts: Input prompts to process through the chain
        require_all: If True, fail if any agent fails. If False,
                     continue with remaining agents.
        kwargs: Additional arguments passed to each agent

    Yields:
        Tuples of (agent, event) where agent is the Agent instance
        and event is the streaming event.
    """
    from pydantic_ai import PartDeltaEvent, TextPartDelta

    from llmling_agent.agent.agent import StreamCompleteEvent

    current_message = prompts
    collected_content = []

    for agent in self.agents:
        try:
            agent_content = []

            # Use wrapper to normalize all streaming nodes to (agent, event) tuples
            def _raise_streaming_error(agent=agent):
                msg = f"Agent {agent.name} does not support streaming"
                raise ValueError(msg)  # noqa: TRY301

            if hasattr(agent, "run_stream"):
                stream = normalize_stream_for_teams(agent, *current_message, **kwargs)
            else:
                _raise_streaming_error()

            async for agent_event_tuple in stream:
                actual_agent, event = agent_event_tuple
                match event:
                    case PartDeltaEvent(delta=TextPartDelta(content_delta=delta)):
                        agent_content.append(delta)
                        collected_content.append(delta)
                        yield (actual_agent, event)  # Yield tuple with agent context
                    case StreamCompleteEvent(message=message):
                        # Use complete response as input for next agent
                        current_message = (message.content,)
                        yield (actual_agent, event)  # Yield tuple with agent context
                    case _:
                        yield (actual_agent, event)  # Yield tuple with agent context

        except Exception as e:
            if require_all:
                msg = f"Chain broken at {agent.name}: {e}"
                logger.exception(msg)
                raise ValueError(msg) from e
            logger.warning("Chain handler failed", name=agent.name, error=e)

Show source on GitHub