Skip to content

TeamRun

Base classes

Name Children Inherits
BaseTeam
llmling_agent.delegation.base_team
Base class for Team and TeamRun.
Generic
typing
Abstract base class for generic types.

⋔ Inheritance diagram

graph TD
  94001354828192["teamrun.TeamRun"]
  94001354904656["base_team.BaseTeam"]
  94001345984832["messagenode.MessageNode"]
  94001297736352["abc.ABC"]
  140380010846688["builtins.object"]
  94001297341184["typing.Generic"]
  94001354904656 --> 94001354828192
  94001345984832 --> 94001354904656
  94001297736352 --> 94001345984832
  140380010846688 --> 94001297736352
  94001297341184 --> 94001345984832
  140380010846688 --> 94001297341184
  94001297341184 --> 94001354904656
  94001297341184 --> 94001354828192

🛈 DocStrings

Bases: BaseTeam[TDeps, TResult]

Handles team operations with monitoring.

Source code in src/llmling_agent/delegation/teamrun.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
class TeamRun[TDeps, TResult](BaseTeam[TDeps, TResult]):
    """Handles team operations with monitoring."""

    @overload  # validator set: it defines the output
    def __init__(
        self,
        agents: Sequence[MessageNode[TDeps, Any]],
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        validator: MessageNode[Any, TResult],
        picker: Agent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
    ) -> None: ...

    @overload
    def __init__(  # no validator, but all nodes same output type.
        self,
        agents: Sequence[MessageNode[TDeps, TResult]],
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        validator: None = None,
        picker: Agent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
    ) -> None: ...

    @overload
    def __init__(
        self,
        agents: Sequence[MessageNode[TDeps, Any]],
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        validator: MessageNode[Any, TResult] | None = None,
        picker: Agent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
    ) -> None: ...

    def __init__(
        self,
        agents: Sequence[MessageNode[TDeps, Any]],
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        validator: MessageNode[Any, TResult] | None = None,
        picker: Agent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
        # result_mode: ResultMode = "last",
    ) -> None:
        super().__init__(
            agents,
            name=name,
            description=description,
            shared_prompt=shared_prompt,
            picker=picker,
            num_picks=num_picks,
            pick_prompt=pick_prompt,
        )
        self.validator = validator
        self.result_mode = "last"

    def __prompt__(self) -> str:
        """Format team info for prompts."""
        members = " -> ".join(a.name for a in self.agents)
        desc = f" - {self.description}" if self.description else ""
        return f"Sequential Team {self.name!r}{desc}\nPipeline: {members}"

    async def run(
        self,
        *prompts: PromptCompatible | None,
        wait_for_connections: bool | None = None,
        store_history: bool = False,
        **kwargs: Any,
    ) -> ChatMessage[TResult]:
        """Run agents sequentially and return combined message."""
        # Prepare prompts and create user message
        user_msg, processed_prompts, original_message = await prepare_prompts(*prompts)
        self.message_received.emit(user_msg)

        # Execute sequential logic
        message_id = str(uuid4())  # Always generate unique response ID
        result = await self.execute(*processed_prompts, **kwargs)
        all_messages = [r.message for r in result if r.message]
        assert all_messages, "Error during execution, returned None for TeamRun"
        # Determine content based on mode
        match self.result_mode:
            case "last":
                content = all_messages[-1].content
            # case "concat":
            #     content = "\n".join(msg.format() for msg in all_messages)
            case _:
                msg = f"Invalid result mode: {self.result_mode}"
                raise ValueError(msg)

        message = ChatMessage(
            content=content,
            messages=[m for chat_message in all_messages for m in chat_message.messages],
            role="assistant",
            name=self.name,
            associated_messages=all_messages,
            message_id=message_id,
            conversation_id=user_msg.conversation_id,
            metadata={
                "execution_order": [r.agent_name for r in result],
                "start_time": result.start_time.isoformat(),
                "errors": {name: str(error) for name, error in result.errors.items()},
            },
        )

        # Teams typically don't store history by default, but allow it
        if store_history:
            # Teams could implement their own history management here if needed
            pass

        # Finalize and route message
        return await finalize_message(
            message,
            user_msg,
            self,
            self.connections,
            original_message,
            wait_for_connections,
        )

    async def execute(
        self,
        *prompts: PromptCompatible | None,
        **kwargs: Any,
    ) -> TeamResponse[TResult]:
        """Start execution with optional monitoring."""
        self._team_talk.clear()
        start_time = get_now()
        final_prompt = list(prompts)
        if self.shared_prompt:
            final_prompt.insert(0, self.shared_prompt)

        responses = [
            i async for i in self.execute_iter(*final_prompt) if isinstance(i, AgentResponse)
        ]
        return TeamResponse(responses, start_time)

    async def run_iter(
        self,
        *prompts: PromptCompatible,
        **kwargs: Any,
    ) -> AsyncIterator[ChatMessage[Any]]:
        """Yield messages from the execution chain."""
        async for item in self.execute_iter(*prompts, **kwargs):
            match item:
                case AgentResponse():
                    if item.message:
                        yield item.message
                case Talk():
                    pass

    async def execute_iter(
        self,
        *prompt: PromptCompatible,
        **kwargs: Any,
    ) -> AsyncIterator[Talk[Any] | AgentResponse[Any]]:
        from toprompt import to_prompt

        connections: list[Talk[Any]] = []
        try:
            combined_prompt = "\n".join([await to_prompt(p) for p in prompt])
            all_nodes = list(await self.pick_agents(combined_prompt))
            if self.validator:
                all_nodes.append(self.validator)
            first = all_nodes[0]
            connections = [s.connect_to(t, queued=True) for s, t in pairwise(all_nodes)]
            for conn in connections:
                self._team_talk.append(conn)

            # First agent
            start = perf_counter()
            message = await first.run(*prompt, **kwargs)
            timing = perf_counter() - start
            response = AgentResponse[Any](first.name, message=message, timing=timing)
            yield response

            # Process through chain
            for connection in connections:
                target = connection.targets[0]
                target_name = target.name
                yield connection

                # Let errors propagate - they break the chain
                start = perf_counter()
                messages = await connection.trigger()

                if target == all_nodes[-1]:
                    last_talk = Talk[Any](target, [], connection_type="run")
                    if response.message:
                        last_talk.stats.messages.append(response.message)
                    self._team_talk.append(last_talk)

                timing = perf_counter() - start
                msg = messages[0]
                response = AgentResponse[Any](target_name, message=msg, timing=timing)
                yield response

        finally:  # Always clean up connections
            for connection in connections:
                connection.disconnect()

    async def run_stream(
        self,
        *prompts: PromptCompatible,
        require_all: bool = True,
        **kwargs: Any,
    ) -> AsyncIterator[tuple[MessageNode[Any, Any], RichAgentStreamEvent[Any]]]:
        """Stream responses through the chain of team members.

        Args:
            prompts: Input prompts to process through the chain
            require_all: If True, fail if any agent fails. If False,
                         continue with remaining agents.
            kwargs: Additional arguments passed to each agent

        Yields:
            Tuples of (agent, event) where agent is the Agent instance
            and event is the streaming event.
        """
        from pydantic_ai import PartDeltaEvent, TextPartDelta

        from llmling_agent.agent.events import StreamCompleteEvent

        current_message = prompts
        collected_content = []

        for agent in self.agents:
            try:
                agent_content = []

                # Use wrapper to normalize all streaming nodes to (agent, event) tuples
                def _raise_streaming_error(agent: MessageNode[Any, Any] = agent) -> None:
                    msg = f"Agent {agent.name} does not support streaming"
                    raise ValueError(msg)  # noqa: TRY301

                if hasattr(agent, "run_stream"):
                    stream = normalize_stream_for_teams(agent, *current_message, **kwargs)
                else:
                    _raise_streaming_error()

                async for agent_event_tuple in stream:
                    actual_agent, event = agent_event_tuple
                    match event:
                        case PartDeltaEvent(delta=TextPartDelta(content_delta=delta)):
                            agent_content.append(delta)
                            collected_content.append(delta)
                            yield (actual_agent, event)  # Yield tuple with agent context
                        case StreamCompleteEvent(message=message):
                            # Use complete response as input for next agent
                            current_message = (message.content,)
                            yield (actual_agent, event)  # Yield tuple with agent context
                        case _:
                            yield (actual_agent, event)  # Yield tuple with agent context

            except Exception as e:
                if require_all:
                    msg = f"Chain broken at {agent.name}: {e}"
                    logger.exception(msg)
                    raise ValueError(msg) from e
                logger.warning("Chain handler failed", name=agent.name, error=e)

__prompt__

__prompt__() -> str

Format team info for prompts.

Source code in src/llmling_agent/delegation/teamrun.py
127
128
129
130
131
def __prompt__(self) -> str:
    """Format team info for prompts."""
    members = " -> ".join(a.name for a in self.agents)
    desc = f" - {self.description}" if self.description else ""
    return f"Sequential Team {self.name!r}{desc}\nPipeline: {members}"

execute async

execute(*prompts: PromptCompatible | None, **kwargs: Any) -> TeamResponse[TResult]

Start execution with optional monitoring.

Source code in src/llmling_agent/delegation/teamrun.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
async def execute(
    self,
    *prompts: PromptCompatible | None,
    **kwargs: Any,
) -> TeamResponse[TResult]:
    """Start execution with optional monitoring."""
    self._team_talk.clear()
    start_time = get_now()
    final_prompt = list(prompts)
    if self.shared_prompt:
        final_prompt.insert(0, self.shared_prompt)

    responses = [
        i async for i in self.execute_iter(*final_prompt) if isinstance(i, AgentResponse)
    ]
    return TeamResponse(responses, start_time)

run async

run(
    *prompts: PromptCompatible | None,
    wait_for_connections: bool | None = None,
    store_history: bool = False,
    **kwargs: Any
) -> ChatMessage[TResult]

Run agents sequentially and return combined message.

Source code in src/llmling_agent/delegation/teamrun.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
async def run(
    self,
    *prompts: PromptCompatible | None,
    wait_for_connections: bool | None = None,
    store_history: bool = False,
    **kwargs: Any,
) -> ChatMessage[TResult]:
    """Run agents sequentially and return combined message."""
    # Prepare prompts and create user message
    user_msg, processed_prompts, original_message = await prepare_prompts(*prompts)
    self.message_received.emit(user_msg)

    # Execute sequential logic
    message_id = str(uuid4())  # Always generate unique response ID
    result = await self.execute(*processed_prompts, **kwargs)
    all_messages = [r.message for r in result if r.message]
    assert all_messages, "Error during execution, returned None for TeamRun"
    # Determine content based on mode
    match self.result_mode:
        case "last":
            content = all_messages[-1].content
        # case "concat":
        #     content = "\n".join(msg.format() for msg in all_messages)
        case _:
            msg = f"Invalid result mode: {self.result_mode}"
            raise ValueError(msg)

    message = ChatMessage(
        content=content,
        messages=[m for chat_message in all_messages for m in chat_message.messages],
        role="assistant",
        name=self.name,
        associated_messages=all_messages,
        message_id=message_id,
        conversation_id=user_msg.conversation_id,
        metadata={
            "execution_order": [r.agent_name for r in result],
            "start_time": result.start_time.isoformat(),
            "errors": {name: str(error) for name, error in result.errors.items()},
        },
    )

    # Teams typically don't store history by default, but allow it
    if store_history:
        # Teams could implement their own history management here if needed
        pass

    # Finalize and route message
    return await finalize_message(
        message,
        user_msg,
        self,
        self.connections,
        original_message,
        wait_for_connections,
    )

run_iter async

run_iter(*prompts: PromptCompatible, **kwargs: Any) -> AsyncIterator[ChatMessage[Any]]

Yield messages from the execution chain.

Source code in src/llmling_agent/delegation/teamrun.py
207
208
209
210
211
212
213
214
215
216
217
218
219
async def run_iter(
    self,
    *prompts: PromptCompatible,
    **kwargs: Any,
) -> AsyncIterator[ChatMessage[Any]]:
    """Yield messages from the execution chain."""
    async for item in self.execute_iter(*prompts, **kwargs):
        match item:
            case AgentResponse():
                if item.message:
                    yield item.message
            case Talk():
                pass

run_stream async

run_stream(
    *prompts: PromptCompatible, require_all: bool = True, **kwargs: Any
) -> AsyncIterator[tuple[MessageNode[Any, Any], RichAgentStreamEvent[Any]]]

Stream responses through the chain of team members.

Parameters:

Name Type Description Default
prompts PromptCompatible

Input prompts to process through the chain

()
require_all bool

If True, fail if any agent fails. If False, continue with remaining agents.

True
kwargs Any

Additional arguments passed to each agent

{}

Yields:

Type Description
AsyncIterator[tuple[MessageNode[Any, Any], RichAgentStreamEvent[Any]]]

Tuples of (agent, event) where agent is the Agent instance

AsyncIterator[tuple[MessageNode[Any, Any], RichAgentStreamEvent[Any]]]

and event is the streaming event.

Source code in src/llmling_agent/delegation/teamrun.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
async def run_stream(
    self,
    *prompts: PromptCompatible,
    require_all: bool = True,
    **kwargs: Any,
) -> AsyncIterator[tuple[MessageNode[Any, Any], RichAgentStreamEvent[Any]]]:
    """Stream responses through the chain of team members.

    Args:
        prompts: Input prompts to process through the chain
        require_all: If True, fail if any agent fails. If False,
                     continue with remaining agents.
        kwargs: Additional arguments passed to each agent

    Yields:
        Tuples of (agent, event) where agent is the Agent instance
        and event is the streaming event.
    """
    from pydantic_ai import PartDeltaEvent, TextPartDelta

    from llmling_agent.agent.events import StreamCompleteEvent

    current_message = prompts
    collected_content = []

    for agent in self.agents:
        try:
            agent_content = []

            # Use wrapper to normalize all streaming nodes to (agent, event) tuples
            def _raise_streaming_error(agent: MessageNode[Any, Any] = agent) -> None:
                msg = f"Agent {agent.name} does not support streaming"
                raise ValueError(msg)  # noqa: TRY301

            if hasattr(agent, "run_stream"):
                stream = normalize_stream_for_teams(agent, *current_message, **kwargs)
            else:
                _raise_streaming_error()

            async for agent_event_tuple in stream:
                actual_agent, event = agent_event_tuple
                match event:
                    case PartDeltaEvent(delta=TextPartDelta(content_delta=delta)):
                        agent_content.append(delta)
                        collected_content.append(delta)
                        yield (actual_agent, event)  # Yield tuple with agent context
                    case StreamCompleteEvent(message=message):
                        # Use complete response as input for next agent
                        current_message = (message.content,)
                        yield (actual_agent, event)  # Yield tuple with agent context
                    case _:
                        yield (actual_agent, event)  # Yield tuple with agent context

        except Exception as e:
            if require_all:
                msg = f"Chain broken at {agent.name}: {e}"
                logger.exception(msg)
                raise ValueError(msg) from e
            logger.warning("Chain handler failed", name=agent.name, error=e)

Show source on GitHub