Skip to content

teamrun

Class info

Classes

Name Children Inherits
AgentResponse
llmling_agent.messaging.messages
Result from an agent's execution.
    BaseTeam
    llmling_agent.delegation.base_team
    Base class for Team and TeamRun.
    ChatMessage
    llmling_agent.messaging.messages
    Common message format for all UI types.
      ExtendedTeamTalk
      llmling_agent.delegation.teamrun
      TeamTalk that also provides TeamRunStats interface.
        SupportsRunStream
        llmling_agent.common_types
        Protocol for nodes that support streaming via run_stream().
          Talk
          llmling_agent.talk.talk
          Manages message flow between agents/groups.
            TeamResponse
            llmling_agent.messaging.messages
            Results from a team execution.
              TeamRun
              llmling_agent.delegation.teamrun
              Handles team operations with monitoring.
                TeamTalk
                llmling_agent.talk.talk
                Group of connections with aggregate operations.

                🛈 DocStrings

                Sequential, ordered group of agents / nodes.

                ExtendedTeamTalk dataclass

                Bases: TeamTalk

                TeamTalk that also provides TeamRunStats interface.

                Source code in src/llmling_agent/delegation/teamrun.py
                38
                39
                40
                41
                42
                43
                44
                45
                46
                47
                48
                49
                50
                51
                @dataclass(frozen=True, kw_only=True)
                class ExtendedTeamTalk(TeamTalk):
                    """TeamTalk that also provides TeamRunStats interface."""
                
                    errors: list[tuple[str, str, datetime]] = field(default_factory=list)
                
                    def clear(self) -> None:
                        """Reset all tracking data."""
                        super().clear()  # Clear base TeamTalk
                        self.errors.clear()
                
                    def add_error(self, agent: str, error: str) -> None:
                        """Track errors from AgentResponses."""
                        self.errors.append((agent, error, get_now()))
                

                add_error

                add_error(agent: str, error: str) -> None
                

                Track errors from AgentResponses.

                Source code in src/llmling_agent/delegation/teamrun.py
                49
                50
                51
                def add_error(self, agent: str, error: str) -> None:
                    """Track errors from AgentResponses."""
                    self.errors.append((agent, error, get_now()))
                

                clear

                clear() -> None
                

                Reset all tracking data.

                Source code in src/llmling_agent/delegation/teamrun.py
                44
                45
                46
                47
                def clear(self) -> None:
                    """Reset all tracking data."""
                    super().clear()  # Clear base TeamTalk
                    self.errors.clear()
                

                TeamRun

                Bases: BaseTeam[TDeps, TResult]

                Handles team operations with monitoring.

                Source code in src/llmling_agent/delegation/teamrun.py
                 54
                 55
                 56
                 57
                 58
                 59
                 60
                 61
                 62
                 63
                 64
                 65
                 66
                 67
                 68
                 69
                 70
                 71
                 72
                 73
                 74
                 75
                 76
                 77
                 78
                 79
                 80
                 81
                 82
                 83
                 84
                 85
                 86
                 87
                 88
                 89
                 90
                 91
                 92
                 93
                 94
                 95
                 96
                 97
                 98
                 99
                100
                101
                102
                103
                104
                105
                106
                107
                108
                109
                110
                111
                112
                113
                114
                115
                116
                117
                118
                119
                120
                121
                122
                123
                124
                125
                126
                127
                128
                129
                130
                131
                132
                133
                134
                135
                136
                137
                138
                139
                140
                141
                142
                143
                144
                145
                146
                147
                148
                149
                150
                151
                152
                153
                154
                155
                156
                157
                158
                159
                160
                161
                162
                163
                164
                165
                166
                167
                168
                169
                170
                171
                172
                173
                174
                175
                176
                177
                178
                179
                180
                181
                182
                183
                184
                185
                186
                187
                188
                189
                190
                191
                192
                193
                194
                195
                196
                197
                198
                199
                200
                201
                202
                203
                204
                205
                206
                207
                208
                209
                210
                211
                212
                213
                214
                215
                216
                217
                218
                219
                220
                221
                222
                223
                224
                225
                226
                227
                228
                229
                230
                231
                232
                233
                234
                235
                236
                237
                238
                239
                240
                241
                242
                243
                244
                245
                246
                247
                248
                249
                250
                251
                252
                253
                254
                255
                256
                257
                258
                259
                260
                261
                262
                263
                264
                265
                266
                267
                268
                269
                270
                271
                272
                273
                274
                275
                276
                277
                278
                279
                280
                281
                282
                283
                284
                285
                286
                287
                288
                289
                290
                291
                292
                293
                294
                295
                296
                297
                298
                299
                300
                301
                302
                303
                304
                305
                306
                307
                308
                309
                310
                311
                312
                313
                314
                315
                316
                317
                class TeamRun[TDeps, TResult](BaseTeam[TDeps, TResult]):
                    """Handles team operations with monitoring."""
                
                    @overload  # validator set: it defines the output
                    def __init__(
                        self,
                        agents: Sequence[MessageNode[TDeps, Any]],
                        *,
                        name: str | None = None,
                        description: str | None = None,
                        display_name: str | None = None,
                        shared_prompt: str | None = None,
                        validator: MessageNode[Any, TResult],
                        picker: SupportsStructuredOutput | None = None,
                        num_picks: int | None = None,
                        pick_prompt: str | None = None,
                    ) -> None: ...
                
                    @overload
                    def __init__(  # no validator, but all nodes same output type.
                        self,
                        agents: Sequence[MessageNode[TDeps, TResult]],
                        *,
                        name: str | None = None,
                        description: str | None = None,
                        display_name: str | None = None,
                        shared_prompt: str | None = None,
                        validator: None = None,
                        picker: SupportsStructuredOutput | None = None,
                        num_picks: int | None = None,
                        pick_prompt: str | None = None,
                    ) -> None: ...
                
                    @overload
                    def __init__(
                        self,
                        agents: Sequence[MessageNode[TDeps, Any]],
                        *,
                        name: str | None = None,
                        description: str | None = None,
                        display_name: str | None = None,
                        shared_prompt: str | None = None,
                        validator: MessageNode[Any, TResult] | None = None,
                        picker: SupportsStructuredOutput | None = None,
                        num_picks: int | None = None,
                        pick_prompt: str | None = None,
                    ) -> None: ...
                
                    def __init__(
                        self,
                        agents: Sequence[MessageNode[TDeps, Any]],
                        *,
                        name: str | None = None,
                        description: str | None = None,
                        display_name: str | None = None,
                        shared_prompt: str | None = None,
                        validator: MessageNode[Any, TResult] | None = None,
                        picker: SupportsStructuredOutput | None = None,
                        num_picks: int | None = None,
                        pick_prompt: str | None = None,
                        # result_mode: ResultMode = "last",
                    ) -> None:
                        super().__init__(
                            agents,
                            name=name,
                            description=description,
                            display_name=display_name,
                            shared_prompt=shared_prompt,
                            picker=picker,
                            num_picks=num_picks,
                            pick_prompt=pick_prompt,
                        )
                        self.validator = validator
                        self.result_mode = "last"
                
                    def __prompt__(self) -> str:
                        """Format team info for prompts."""
                        members = " -> ".join(a.name for a in self.nodes)
                        desc = f" - {self.description}" if self.description else ""
                        return f"Sequential Team {self.name!r}{desc}\nPipeline: {members}"
                
                    async def run(
                        self,
                        *prompts: PromptCompatible | None,
                        wait_for_connections: bool | None = None,
                        store_history: bool = False,
                        **kwargs: Any,
                    ) -> ChatMessage[TResult]:
                        """Run agents sequentially and return combined message."""
                        # Prepare prompts and create user message
                        user_msg, processed_prompts, original_message = await prepare_prompts(*prompts)
                        self.message_received.emit(user_msg)
                        # Execute sequential logic
                        message_id = str(uuid4())  # Always generate unique response ID
                        result = await self.execute(*processed_prompts, **kwargs)
                        all_messages = [r.message for r in result if r.message]
                        assert all_messages, "Error during execution, returned None for TeamRun"
                        # Determine content based on mode
                        match self.result_mode:
                            case "last":
                                content = all_messages[-1].content
                            # case "concat":
                            #     content = "\n".join(msg.format() for msg in all_messages)
                            case _:
                                msg = f"Invalid result mode: {self.result_mode}"
                                raise ValueError(msg)
                
                        message = ChatMessage(
                            content=content,
                            messages=[m for chat_message in all_messages for m in chat_message.messages],
                            role="assistant",
                            name=self.name,
                            associated_messages=all_messages,
                            message_id=message_id,
                            conversation_id=user_msg.conversation_id,
                            metadata={
                                "execution_order": [r.agent_name for r in result],
                                "start_time": result.start_time.isoformat(),
                                "errors": {name: str(error) for name, error in result.errors.items()},
                            },
                        )
                
                        if store_history:
                            pass  # Teams could implement their own history management here if needed
                        return await finalize_message(  # Finalize and route message
                            message,
                            user_msg,
                            self,
                            self.connections,
                            original_message,
                            wait_for_connections,
                        )
                
                    async def execute(
                        self,
                        *prompts: PromptCompatible | None,
                        **kwargs: Any,
                    ) -> TeamResponse[TResult]:
                        """Start execution with optional monitoring."""
                        self._team_talk.clear()
                        start_time = get_now()
                        prompts_ = list(prompts)
                        if self.shared_prompt:
                            prompts_.insert(0, self.shared_prompt)
                        responses = [i async for i in self.execute_iter(*prompts_) if isinstance(i, AgentResponse)]
                        return TeamResponse(responses, start_time)
                
                    async def run_iter(
                        self,
                        *prompts: PromptCompatible,
                        **kwargs: Any,
                    ) -> AsyncIterator[ChatMessage[Any]]:
                        """Yield messages from the execution chain."""
                        async for item in self.execute_iter(*prompts, **kwargs):
                            match item:
                                case AgentResponse():
                                    if item.message:
                                        yield item.message
                                case Talk():
                                    pass
                
                    async def execute_iter(
                        self,
                        *prompt: PromptCompatible,
                        **kwargs: Any,
                    ) -> AsyncIterator[Talk[Any] | AgentResponse[Any]]:
                        from toprompt import to_prompt
                
                        connections: list[Talk[Any]] = []
                        try:
                            combined_prompt = "\n".join([await to_prompt(p) for p in prompt])
                            all_nodes = list(await self.pick_agents(combined_prompt))
                            if self.validator:
                                all_nodes.append(self.validator)
                            first = all_nodes[0]
                            connections = [s.connect_to(t, queued=True) for s, t in pairwise(all_nodes)]
                            for conn in connections:
                                self._team_talk.append(conn)
                
                            # First agent
                            start = perf_counter()
                            message = await first.run(*prompt, **kwargs)
                            timing = perf_counter() - start
                            response = AgentResponse[Any](first.name, message=message, timing=timing)
                            yield response
                
                            # Process through chain
                            for connection in connections:
                                target = connection.targets[0]
                                target_name = target.name
                                yield connection
                
                                # Let errors propagate - they break the chain
                                start = perf_counter()
                                messages = await connection.trigger()
                
                                if target == all_nodes[-1]:
                                    last_talk = Talk[Any](target, [], connection_type="run")
                                    if response.message:
                                        last_talk.stats.messages.append(response.message)
                                    self._team_talk.append(last_talk)
                
                                timing = perf_counter() - start
                                msg = messages[0]
                                response = AgentResponse[Any](target_name, message=msg, timing=timing)
                                yield response
                
                        finally:  # Always clean up connections
                            for connection in connections:
                                connection.disconnect()
                
                    async def run_stream(
                        self,
                        *prompts: PromptCompatible,
                        require_all: bool = True,
                        **kwargs: Any,
                    ) -> AsyncIterator[tuple[MessageNode[Any, Any], RichAgentStreamEvent[Any]]]:
                        """Stream responses through the chain of team members.
                
                        Args:
                            prompts: Input prompts to process through the chain
                            require_all: If True, fail if any agent fails. If False,
                                         continue with remaining agents.
                            kwargs: Additional arguments passed to each agent
                
                        Yields:
                            Tuples of (agent, event) where agent is the Agent instance
                            and event is the streaming event.
                        """
                        from llmling_agent.agent.events import StreamCompleteEvent
                
                        current_message = prompts
                        collected_content = []
                        for agent in self.nodes:
                            try:
                                agent_content = []
                
                                # Use wrapper to normalize all streaming nodes to (agent, event) tuples
                                if not isinstance(agent, SupportsRunStream):
                                    msg = f"Agent {agent.name} does not support streaming"
                                    raise TypeError(msg)  # noqa: TRY301
                
                                stream = normalize_stream_for_teams(agent, *current_message, **kwargs)
                
                                async for agent_event_tuple in stream:
                                    actual_agent, event = agent_event_tuple
                                    match event:
                                        case PartDeltaEvent(delta=TextPartDelta(content_delta=delta)):
                                            agent_content.append(delta)
                                            collected_content.append(delta)
                                            yield (actual_agent, event)  # Yield tuple with agent context
                                        case StreamCompleteEvent(message=message):
                                            # Use complete response as input for next agent
                                            current_message = (message.content,)
                                            yield (actual_agent, event)  # Yield tuple with agent context
                                        case _:
                                            yield (actual_agent, event)  # Yield tuple with agent context
                
                            except Exception as e:
                                if require_all:
                                    msg = f"Chain broken at {agent.name}: {e}"
                                    logger.exception(msg)
                                    raise ValueError(msg) from e
                                logger.warning("Chain handler failed", name=agent.name, error=e)
                

                __prompt__

                __prompt__() -> str
                

                Format team info for prompts.

                Source code in src/llmling_agent/delegation/teamrun.py
                129
                130
                131
                132
                133
                def __prompt__(self) -> str:
                    """Format team info for prompts."""
                    members = " -> ".join(a.name for a in self.nodes)
                    desc = f" - {self.description}" if self.description else ""
                    return f"Sequential Team {self.name!r}{desc}\nPipeline: {members}"
                

                execute async

                execute(*prompts: PromptCompatible | None, **kwargs: Any) -> TeamResponse[TResult]
                

                Start execution with optional monitoring.

                Source code in src/llmling_agent/delegation/teamrun.py
                187
                188
                189
                190
                191
                192
                193
                194
                195
                196
                197
                198
                199
                async def execute(
                    self,
                    *prompts: PromptCompatible | None,
                    **kwargs: Any,
                ) -> TeamResponse[TResult]:
                    """Start execution with optional monitoring."""
                    self._team_talk.clear()
                    start_time = get_now()
                    prompts_ = list(prompts)
                    if self.shared_prompt:
                        prompts_.insert(0, self.shared_prompt)
                    responses = [i async for i in self.execute_iter(*prompts_) if isinstance(i, AgentResponse)]
                    return TeamResponse(responses, start_time)
                

                run async

                run(
                    *prompts: PromptCompatible | None,
                    wait_for_connections: bool | None = None,
                    store_history: bool = False,
                    **kwargs: Any
                ) -> ChatMessage[TResult]
                

                Run agents sequentially and return combined message.

                Source code in src/llmling_agent/delegation/teamrun.py
                135
                136
                137
                138
                139
                140
                141
                142
                143
                144
                145
                146
                147
                148
                149
                150
                151
                152
                153
                154
                155
                156
                157
                158
                159
                160
                161
                162
                163
                164
                165
                166
                167
                168
                169
                170
                171
                172
                173
                174
                175
                176
                177
                178
                179
                180
                181
                182
                183
                184
                185
                async def run(
                    self,
                    *prompts: PromptCompatible | None,
                    wait_for_connections: bool | None = None,
                    store_history: bool = False,
                    **kwargs: Any,
                ) -> ChatMessage[TResult]:
                    """Run agents sequentially and return combined message."""
                    # Prepare prompts and create user message
                    user_msg, processed_prompts, original_message = await prepare_prompts(*prompts)
                    self.message_received.emit(user_msg)
                    # Execute sequential logic
                    message_id = str(uuid4())  # Always generate unique response ID
                    result = await self.execute(*processed_prompts, **kwargs)
                    all_messages = [r.message for r in result if r.message]
                    assert all_messages, "Error during execution, returned None for TeamRun"
                    # Determine content based on mode
                    match self.result_mode:
                        case "last":
                            content = all_messages[-1].content
                        # case "concat":
                        #     content = "\n".join(msg.format() for msg in all_messages)
                        case _:
                            msg = f"Invalid result mode: {self.result_mode}"
                            raise ValueError(msg)
                
                    message = ChatMessage(
                        content=content,
                        messages=[m for chat_message in all_messages for m in chat_message.messages],
                        role="assistant",
                        name=self.name,
                        associated_messages=all_messages,
                        message_id=message_id,
                        conversation_id=user_msg.conversation_id,
                        metadata={
                            "execution_order": [r.agent_name for r in result],
                            "start_time": result.start_time.isoformat(),
                            "errors": {name: str(error) for name, error in result.errors.items()},
                        },
                    )
                
                    if store_history:
                        pass  # Teams could implement their own history management here if needed
                    return await finalize_message(  # Finalize and route message
                        message,
                        user_msg,
                        self,
                        self.connections,
                        original_message,
                        wait_for_connections,
                    )
                

                run_iter async

                run_iter(*prompts: PromptCompatible, **kwargs: Any) -> AsyncIterator[ChatMessage[Any]]
                

                Yield messages from the execution chain.

                Source code in src/llmling_agent/delegation/teamrun.py
                201
                202
                203
                204
                205
                206
                207
                208
                209
                210
                211
                212
                213
                async def run_iter(
                    self,
                    *prompts: PromptCompatible,
                    **kwargs: Any,
                ) -> AsyncIterator[ChatMessage[Any]]:
                    """Yield messages from the execution chain."""
                    async for item in self.execute_iter(*prompts, **kwargs):
                        match item:
                            case AgentResponse():
                                if item.message:
                                    yield item.message
                            case Talk():
                                pass
                

                run_stream async

                run_stream(
                    *prompts: PromptCompatible, require_all: bool = True, **kwargs: Any
                ) -> AsyncIterator[tuple[MessageNode[Any, Any], RichAgentStreamEvent[Any]]]
                

                Stream responses through the chain of team members.

                Parameters:

                Name Type Description Default
                prompts PromptCompatible

                Input prompts to process through the chain

                ()
                require_all bool

                If True, fail if any agent fails. If False, continue with remaining agents.

                True
                kwargs Any

                Additional arguments passed to each agent

                {}

                Yields:

                Type Description
                AsyncIterator[tuple[MessageNode[Any, Any], RichAgentStreamEvent[Any]]]

                Tuples of (agent, event) where agent is the Agent instance

                AsyncIterator[tuple[MessageNode[Any, Any], RichAgentStreamEvent[Any]]]

                and event is the streaming event.

                Source code in src/llmling_agent/delegation/teamrun.py
                265
                266
                267
                268
                269
                270
                271
                272
                273
                274
                275
                276
                277
                278
                279
                280
                281
                282
                283
                284
                285
                286
                287
                288
                289
                290
                291
                292
                293
                294
                295
                296
                297
                298
                299
                300
                301
                302
                303
                304
                305
                306
                307
                308
                309
                310
                311
                312
                313
                314
                315
                316
                317
                async def run_stream(
                    self,
                    *prompts: PromptCompatible,
                    require_all: bool = True,
                    **kwargs: Any,
                ) -> AsyncIterator[tuple[MessageNode[Any, Any], RichAgentStreamEvent[Any]]]:
                    """Stream responses through the chain of team members.
                
                    Args:
                        prompts: Input prompts to process through the chain
                        require_all: If True, fail if any agent fails. If False,
                                     continue with remaining agents.
                        kwargs: Additional arguments passed to each agent
                
                    Yields:
                        Tuples of (agent, event) where agent is the Agent instance
                        and event is the streaming event.
                    """
                    from llmling_agent.agent.events import StreamCompleteEvent
                
                    current_message = prompts
                    collected_content = []
                    for agent in self.nodes:
                        try:
                            agent_content = []
                
                            # Use wrapper to normalize all streaming nodes to (agent, event) tuples
                            if not isinstance(agent, SupportsRunStream):
                                msg = f"Agent {agent.name} does not support streaming"
                                raise TypeError(msg)  # noqa: TRY301
                
                            stream = normalize_stream_for_teams(agent, *current_message, **kwargs)
                
                            async for agent_event_tuple in stream:
                                actual_agent, event = agent_event_tuple
                                match event:
                                    case PartDeltaEvent(delta=TextPartDelta(content_delta=delta)):
                                        agent_content.append(delta)
                                        collected_content.append(delta)
                                        yield (actual_agent, event)  # Yield tuple with agent context
                                    case StreamCompleteEvent(message=message):
                                        # Use complete response as input for next agent
                                        current_message = (message.content,)
                                        yield (actual_agent, event)  # Yield tuple with agent context
                                    case _:
                                        yield (actual_agent, event)  # Yield tuple with agent context
                
                        except Exception as e:
                            if require_all:
                                msg = f"Chain broken at {agent.name}: {e}"
                                logger.exception(msg)
                                raise ValueError(msg) from e
                            logger.warning("Chain handler failed", name=agent.name, error=e)