Skip to content

TeamRun

Base classes

Name Children Inherits
BaseTeam
llmling_agent.delegation.base_team
Base class for Team and TeamRun.
Generic
typing
Abstract base class for generic types.

⋔ Inheritance diagram

graph TD
  94004562482848["teamrun.TeamRun"]
  94004563200544["base_team.BaseTeam"]
  94004562236768["messagenode.MessageNode"]
  94004562935056["messageemitter.MessageEmitter"]
  94004562981872["tasks.TaskManagerMixin"]
  140104485245120["builtins.object"]
  94004506135904["abc.ABC"]
  94004505984624["typing.Generic"]
  94004563200544 --> 94004562482848
  94004562236768 --> 94004563200544
  94004562935056 --> 94004562236768
  94004562981872 --> 94004562935056
  140104485245120 --> 94004562981872
  94004506135904 --> 94004562935056
  140104485245120 --> 94004506135904
  94004505984624 --> 94004562935056
  140104485245120 --> 94004505984624
  94004505984624 --> 94004562236768
  94004505984624 --> 94004563200544
  94004505984624 --> 94004562482848

🛈 DocStrings

Bases: BaseTeam[TDeps, TResult]

Handles team operations with monitoring.

Source code in src/llmling_agent/delegation/teamrun.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
class TeamRun[TDeps, TResult](BaseTeam[TDeps, TResult]):
    """Handles team operations with monitoring."""

    def __init__(
        self,
        agents: Sequence[MessageNode[TDeps, Any]],
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        validator: MessageNode[Any, TResult] | None = None,
        picker: AnyAgent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
        # result_mode: ResultMode = "last",
    ):
        super().__init__(
            agents,
            name=name,
            description=description,
            shared_prompt=shared_prompt,
            picker=picker,
            num_picks=num_picks,
            pick_prompt=pick_prompt,
        )
        self.validator = validator
        self.result_mode = "last"

    def __prompt__(self) -> str:
        """Format team info for prompts."""
        members = " -> ".join(a.name for a in self.agents)
        desc = f" - {self.description}" if self.description else ""
        return f"Sequential Team '{self.name}'{desc}\nPipeline: {members}"

    async def _run(
        self,
        *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
        wait_for_connections: bool | None = None,
        message_id: str | None = None,
        conversation_id: str | None = None,
        **kwargs: Any,
    ) -> ChatMessage[TResult]:
        """Run agents sequentially and return combined message.

        This message wraps execute and extracts the ChatMessage in order to fulfill
        the "message protocol".
        """
        message_id = message_id or str(uuid4())

        result = await self.execute(*prompts, **kwargs)
        all_messages = [r.message for r in result if r.message]
        assert all_messages, "Error during execution, returned None for TeamRun"
        # Determine content based on mode
        match self.result_mode:
            case "last":
                content = all_messages[-1].content
            # case "concat":
            #     content = "\n".join(msg.format() for msg in all_messages)
            case _:
                msg = f"Invalid result mode: {self.result_mode}"
                raise ValueError(msg)

        return ChatMessage(
            content=content,
            role="assistant",
            name=self.name,
            associated_messages=all_messages,
            message_id=message_id,
            conversation_id=conversation_id,
            metadata={
                "execution_order": [r.agent_name for r in result],
                "start_time": result.start_time.isoformat(),
                "errors": {name: str(error) for name, error in result.errors.items()},
            },
        )

    async def execute(
        self,
        *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
        **kwargs: Any,
    ) -> TeamResponse[TResult]:
        """Start execution with optional monitoring."""
        self._team_talk.clear()
        start_time = datetime.now()
        final_prompt = list(prompts)
        if self.shared_prompt:
            final_prompt.insert(0, self.shared_prompt)

        responses = [
            i
            async for i in self.execute_iter(*final_prompt)
            if isinstance(i, AgentResponse)
        ]
        return TeamResponse(responses, start_time)

    async def run_iter(
        self,
        *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
        **kwargs: Any,
    ) -> AsyncIterator[ChatMessage[Any]]:
        """Yield messages from the execution chain."""
        async for item in self.execute_iter(*prompts, **kwargs):
            match item:
                case AgentResponse():
                    if item.message:
                        yield item.message
                case Talk():
                    pass

    async def execute_iter(
        self,
        *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str],
        **kwargs: Any,
    ) -> AsyncIterator[Talk[Any] | AgentResponse[Any]]:
        from toprompt import to_prompt

        connections: list[Talk[Any]] = []
        try:
            combined_prompt = "\n".join([await to_prompt(p) for p in prompt])
            all_nodes = list(await self.pick_agents(combined_prompt))
            if self.validator:
                all_nodes.append(self.validator)
            first = all_nodes[0]
            connections = [
                source.connect_to(target, queued=True)
                for source, target in pairwise(all_nodes)
            ]
            for conn in connections:
                self._team_talk.append(conn)

            # First agent
            start = perf_counter()
            message = await first.run(*prompt, **kwargs)
            timing = perf_counter() - start
            response = AgentResponse[Any](first.name, message=message, timing=timing)
            yield response

            # Process through chain
            for connection in connections:
                target = connection.targets[0]
                target_name = target.name
                yield connection

                # Let errors propagate - they break the chain
                start = perf_counter()
                messages = await connection.trigger()

                # If this is the last node
                if target == all_nodes[-1]:
                    last_talk = Talk[Any](target, [], connection_type="run")
                    if response.message:
                        last_talk.stats.messages.append(response.message)
                    self._team_talk.append(last_talk)

                timing = perf_counter() - start
                msg = messages[0]
                response = AgentResponse[Any](target_name, message=msg, timing=timing)
                yield response

        finally:
            # Always clean up connections
            for connection in connections:
                connection.disconnect()

    @asynccontextmanager
    async def chain_stream(
        self,
        *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
        require_all: bool = True,
        **kwargs: Any,
    ) -> AsyncIterator[StreamingResponseProtocol]:
        """Stream results through chain of team members."""
        from llmling_agent.agent import Agent, StructuredAgent
        from llmling_agent.delegation import TeamRun
        from llmling_agent_providers.base import StreamingResponseProtocol

        async with AsyncExitStack() as stack:
            streams: list[StreamingResponseProtocol[str]] = []
            current_message = prompts

            # Set up all streams
            for agent in self.agents:
                try:
                    assert isinstance(agent, TeamRun | Agent | StructuredAgent), (
                        "Cannot stream teams!"
                    )
                    stream = await stack.enter_async_context(
                        agent.run_stream(*current_message, **kwargs)
                    )
                    streams.append(stream)  # type: ignore
                    # Wait for complete response for next agent
                    async for chunk in stream.stream():
                        current_message = chunk
                        if stream.is_complete:
                            current_message = (stream.formatted_content,)  # type: ignore
                            break
                except Exception as e:
                    if require_all:
                        msg = f"Chain broken at {agent.name}: {e}"
                        raise ValueError(msg) from e
                    logger.warning("Chain handler %s failed: %s", agent.name, e)

            # Create a stream-like interface for the chain
            class ChainStream(StreamingResponseProtocol[str]):
                def __init__(self):
                    self.streams = streams
                    self.current_stream_idx = 0
                    self.is_complete = False
                    self.model_name = None

                def usage(self) -> Usage:
                    @dataclass
                    class Usage:
                        total_tokens: int | None
                        request_tokens: int | None
                        response_tokens: int | None

                    return Usage(0, 0, 0)

                async def stream(self) -> AsyncIterator[str]:  # type: ignore
                    for idx, stream in enumerate(self.streams):
                        self.current_stream_idx = idx
                        async for chunk in stream.stream():
                            yield chunk
                            if idx == len(self.streams) - 1 and stream.is_complete:
                                self.is_complete = True

            yield ChainStream()

    @asynccontextmanager
    async def run_stream(
        self,
        *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
        **kwargs: Any,
    ) -> AsyncIterator[StreamingResponseProtocol[TResult]]:
        """Stream responses through the chain.

        Provides same interface as Agent.run_stream.
        """
        async with self.chain_stream(*prompts, **kwargs) as stream:
            yield stream

__prompt__

__prompt__() -> str

Format team info for prompts.

Source code in src/llmling_agent/delegation/teamrun.py
87
88
89
90
91
def __prompt__(self) -> str:
    """Format team info for prompts."""
    members = " -> ".join(a.name for a in self.agents)
    desc = f" - {self.description}" if self.description else ""
    return f"Sequential Team '{self.name}'{desc}\nPipeline: {members}"

_run async

_run(
    *prompts: AnyPromptType | Image | PathLike[str] | None,
    wait_for_connections: bool | None = None,
    message_id: str | None = None,
    conversation_id: str | None = None,
    **kwargs: Any,
) -> ChatMessage[TResult]

Run agents sequentially and return combined message.

This message wraps execute and extracts the ChatMessage in order to fulfill the "message protocol".

Source code in src/llmling_agent/delegation/teamrun.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
async def _run(
    self,
    *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
    wait_for_connections: bool | None = None,
    message_id: str | None = None,
    conversation_id: str | None = None,
    **kwargs: Any,
) -> ChatMessage[TResult]:
    """Run agents sequentially and return combined message.

    This message wraps execute and extracts the ChatMessage in order to fulfill
    the "message protocol".
    """
    message_id = message_id or str(uuid4())

    result = await self.execute(*prompts, **kwargs)
    all_messages = [r.message for r in result if r.message]
    assert all_messages, "Error during execution, returned None for TeamRun"
    # Determine content based on mode
    match self.result_mode:
        case "last":
            content = all_messages[-1].content
        # case "concat":
        #     content = "\n".join(msg.format() for msg in all_messages)
        case _:
            msg = f"Invalid result mode: {self.result_mode}"
            raise ValueError(msg)

    return ChatMessage(
        content=content,
        role="assistant",
        name=self.name,
        associated_messages=all_messages,
        message_id=message_id,
        conversation_id=conversation_id,
        metadata={
            "execution_order": [r.agent_name for r in result],
            "start_time": result.start_time.isoformat(),
            "errors": {name: str(error) for name, error in result.errors.items()},
        },
    )

chain_stream async

chain_stream(
    *prompts: AnyPromptType | Image | PathLike[str] | None,
    require_all: bool = True,
    **kwargs: Any,
) -> AsyncIterator[StreamingResponseProtocol]

Stream results through chain of team members.

Source code in src/llmling_agent/delegation/teamrun.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
@asynccontextmanager
async def chain_stream(
    self,
    *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
    require_all: bool = True,
    **kwargs: Any,
) -> AsyncIterator[StreamingResponseProtocol]:
    """Stream results through chain of team members."""
    from llmling_agent.agent import Agent, StructuredAgent
    from llmling_agent.delegation import TeamRun
    from llmling_agent_providers.base import StreamingResponseProtocol

    async with AsyncExitStack() as stack:
        streams: list[StreamingResponseProtocol[str]] = []
        current_message = prompts

        # Set up all streams
        for agent in self.agents:
            try:
                assert isinstance(agent, TeamRun | Agent | StructuredAgent), (
                    "Cannot stream teams!"
                )
                stream = await stack.enter_async_context(
                    agent.run_stream(*current_message, **kwargs)
                )
                streams.append(stream)  # type: ignore
                # Wait for complete response for next agent
                async for chunk in stream.stream():
                    current_message = chunk
                    if stream.is_complete:
                        current_message = (stream.formatted_content,)  # type: ignore
                        break
            except Exception as e:
                if require_all:
                    msg = f"Chain broken at {agent.name}: {e}"
                    raise ValueError(msg) from e
                logger.warning("Chain handler %s failed: %s", agent.name, e)

        # Create a stream-like interface for the chain
        class ChainStream(StreamingResponseProtocol[str]):
            def __init__(self):
                self.streams = streams
                self.current_stream_idx = 0
                self.is_complete = False
                self.model_name = None

            def usage(self) -> Usage:
                @dataclass
                class Usage:
                    total_tokens: int | None
                    request_tokens: int | None
                    response_tokens: int | None

                return Usage(0, 0, 0)

            async def stream(self) -> AsyncIterator[str]:  # type: ignore
                for idx, stream in enumerate(self.streams):
                    self.current_stream_idx = idx
                    async for chunk in stream.stream():
                        yield chunk
                        if idx == len(self.streams) - 1 and stream.is_complete:
                            self.is_complete = True

        yield ChainStream()

execute async

execute(
    *prompts: AnyPromptType | Image | PathLike[str] | None, **kwargs: Any
) -> TeamResponse[TResult]

Start execution with optional monitoring.

Source code in src/llmling_agent/delegation/teamrun.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
async def execute(
    self,
    *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
    **kwargs: Any,
) -> TeamResponse[TResult]:
    """Start execution with optional monitoring."""
    self._team_talk.clear()
    start_time = datetime.now()
    final_prompt = list(prompts)
    if self.shared_prompt:
        final_prompt.insert(0, self.shared_prompt)

    responses = [
        i
        async for i in self.execute_iter(*final_prompt)
        if isinstance(i, AgentResponse)
    ]
    return TeamResponse(responses, start_time)

run_iter async

run_iter(
    *prompts: AnyPromptType | Image | PathLike[str], **kwargs: Any
) -> AsyncIterator[ChatMessage[Any]]

Yield messages from the execution chain.

Source code in src/llmling_agent/delegation/teamrun.py
154
155
156
157
158
159
160
161
162
163
164
165
166
async def run_iter(
    self,
    *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
    **kwargs: Any,
) -> AsyncIterator[ChatMessage[Any]]:
    """Yield messages from the execution chain."""
    async for item in self.execute_iter(*prompts, **kwargs):
        match item:
            case AgentResponse():
                if item.message:
                    yield item.message
            case Talk():
                pass

run_stream async

run_stream(
    *prompts: AnyPromptType | Image | PathLike[str], **kwargs: Any
) -> AsyncIterator[StreamingResponseProtocol[TResult]]

Stream responses through the chain.

Provides same interface as Agent.run_stream.

Source code in src/llmling_agent/delegation/teamrun.py
288
289
290
291
292
293
294
295
296
297
298
299
@asynccontextmanager
async def run_stream(
    self,
    *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
    **kwargs: Any,
) -> AsyncIterator[StreamingResponseProtocol[TResult]]:
    """Stream responses through the chain.

    Provides same interface as Agent.run_stream.
    """
    async with self.chain_stream(*prompts, **kwargs) as stream:
        yield stream

Show source on GitHub