Skip to content

Team

Base classes

Name Children Inherits
BaseTeam
llmling_agent.delegation.base_team
Base class for Team and TeamRun.
Generic
typing
Abstract base class for generic types.

⋔ Inheritance diagram

graph TD
  94004563523520["team.Team"]
  94004563200544["base_team.BaseTeam"]
  94004562236768["messagenode.MessageNode"]
  94004562935056["messageemitter.MessageEmitter"]
  94004562981872["tasks.TaskManagerMixin"]
  140104485245120["builtins.object"]
  94004506135904["abc.ABC"]
  94004505984624["typing.Generic"]
  94004563200544 --> 94004563523520
  94004562236768 --> 94004563200544
  94004562935056 --> 94004562236768
  94004562981872 --> 94004562935056
  140104485245120 --> 94004562981872
  94004506135904 --> 94004562935056
  140104485245120 --> 94004506135904
  94004505984624 --> 94004562935056
  140104485245120 --> 94004505984624
  94004505984624 --> 94004562236768
  94004505984624 --> 94004563200544
  94004505984624 --> 94004563523520

🛈 DocStrings

Bases: BaseTeam[TDeps, Any]

Group of agents that can execute together.

Source code in src/llmling_agent/delegation/team.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
class Team[TDeps](BaseTeam[TDeps, Any]):
    """Group of agents that can execute together."""

    async def execute(
        self,
        *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
        **kwargs: Any,
    ) -> TeamResponse:
        """Run all agents in parallel with monitoring."""
        from llmling_agent.talk.talk import Talk

        self._team_talk.clear()

        start_time = datetime.now()
        responses: list[AgentResponse[Any]] = []
        errors: dict[str, Exception] = {}
        final_prompt = list(prompts)
        if self.shared_prompt:
            final_prompt.insert(0, self.shared_prompt)
        combined_prompt = "\n".join([await to_prompt(p) for p in final_prompt])
        all_nodes = list(await self.pick_agents(combined_prompt))
        # Create Talk connections for monitoring this execution
        execution_talks: list[Talk[Any]] = []
        for node in all_nodes:
            talk = Talk[Any](
                node,
                [],  # No actual forwarding, just for tracking
                connection_type="run",
                queued=True,
                queue_strategy="latest",
            )
            execution_talks.append(talk)
            self._team_talk.append(talk)  # Add to base class's TeamTalk

        async def _run(node: MessageNode[TDeps, Any]):
            try:
                start = perf_counter()
                message = await node.run(*final_prompt, **kwargs)
                timing = perf_counter() - start
                r = AgentResponse(agent_name=node.name, message=message, timing=timing)
                responses.append(r)

                # Update talk stats for this agent
                talk = next(t for t in execution_talks if t.source == node)
                talk._stats.messages.append(message)

            except Exception as e:  # noqa: BLE001
                errors[node.name] = e

        # Run all agents in parallel
        await asyncio.gather(*[_run(node) for node in all_nodes])

        return TeamResponse(responses=responses, start_time=start_time, errors=errors)

    def __prompt__(self) -> str:
        """Format team info for prompts."""
        members = ", ".join(a.name for a in self.agents)
        desc = f" - {self.description}" if self.description else ""
        return f"Parallel Team '{self.name}'{desc}\nMembers: {members}"

    async def run_iter(
        self,
        *prompts: AnyPromptType,
        **kwargs: Any,
    ) -> AsyncIterator[ChatMessage[Any]]:
        """Yield messages as they arrive from parallel execution."""
        queue: asyncio.Queue[ChatMessage[Any] | None] = asyncio.Queue()
        failures: dict[str, Exception] = {}

        async def _run(node: MessageNode[TDeps, Any]):
            try:
                message = await node.run(*prompts, **kwargs)
                await queue.put(message)
            except Exception as e:
                logger.exception("Error executing node %s", node.name)
                failures[node.name] = e
                # Put None to maintain queue count
                await queue.put(None)

        # Get nodes to run
        combined_prompt = "\n".join([await to_prompt(p) for p in prompts])
        all_nodes = list(await self.pick_agents(combined_prompt))

        # Start all agents
        tasks = [asyncio.create_task(_run(n), name=f"run_{n.name}") for n in all_nodes]

        try:
            # Yield messages as they arrive
            for _ in all_nodes:
                if msg := await queue.get():
                    yield msg

            # If any failures occurred, raise error with details
            if failures:
                error_details = "\n".join(
                    f"- {name}: {error}" for name, error in failures.items()
                )
                error_msg = f"Some nodes failed to execute:\n{error_details}"
                raise RuntimeError(error_msg)

        finally:
            # Clean up any remaining tasks
            for task in tasks:
                if not task.done():
                    task.cancel()

    async def _run(
        self,
        *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
        wait_for_connections: bool | None = None,
        message_id: str | None = None,
        conversation_id: str | None = None,
        **kwargs: Any,
    ) -> ChatMessage[list[Any]]:
        """Run all agents in parallel and return combined message."""
        result: TeamResponse = await self.execute(*prompts, **kwargs)
        message_id = message_id or str(uuid4())
        return ChatMessage(
            content=[r.message.content for r in result if r.message],
            role="assistant",
            name=self.name,
            message_id=message_id,
            conversation_id=conversation_id,
            metadata={
                "agent_names": [r.agent_name for r in result],
                "errors": {name: str(error) for name, error in result.errors.items()},
                "start_time": result.start_time.isoformat(),
            },
        )

    async def run_job[TJobResult](
        self,
        job: Job[TDeps, TJobResult],
        *,
        store_history: bool = True,
        include_agent_tools: bool = True,
    ) -> list[AgentResponse[TJobResult]]:
        """Execute a job across all team members in parallel.

        Args:
            job: Job configuration to execute
            store_history: Whether to add job execution to conversation history
            include_agent_tools: Whether to include agent's tools alongside job tools

        Returns:
            List of responses from all agents

        Raises:
            JobError: If job execution fails for any agent
            ValueError: If job configuration is invalid
        """
        from llmling_agent.agent import Agent, StructuredAgent
        from llmling_agent.tasks import JobError

        responses: list[AgentResponse[TJobResult]] = []
        errors: dict[str, Exception] = {}
        start_time = datetime.now()

        # Validate dependencies for all agents
        if job.required_dependency is not None:
            invalid_agents = [
                agent.name
                for agent in self.iter_agents()
                if not isinstance(agent.context.data, job.required_dependency)
            ]
            if invalid_agents:
                msg = (
                    f"Agents {', '.join(invalid_agents)} don't have required "
                    f"dependency type: {job.required_dependency}"
                )
                raise JobError(msg)

        try:
            # Load knowledge for all agents if provided
            if job.knowledge:
                # TODO: resources
                tools = [t.name for t in job.get_tools()]
                await self.distribute(content="", tools=tools)

            prompt = await job.get_prompt()

            async def _run(agent: MessageNode[TDeps, TJobResult]):
                assert isinstance(agent, Agent | StructuredAgent)
                try:
                    with agent.tools.temporary_tools(
                        job.get_tools(), exclusive=not include_agent_tools
                    ):
                        start = perf_counter()
                        resp = AgentResponse(
                            agent_name=agent.name,
                            message=await agent.run(prompt, store_history=store_history),  # pyright: ignore
                            timing=perf_counter() - start,
                        )
                        responses.append(resp)
                except Exception as e:  # noqa: BLE001
                    errors[agent.name] = e

            # Run job in parallel on all agents
            await asyncio.gather(*[_run(node) for node in self.agents])

            return TeamResponse(responses=responses, start_time=start_time, errors=errors)

        except Exception as e:
            msg = "Job execution failed"
            logger.exception(msg)
            raise JobError(msg) from e

__prompt__

__prompt__() -> str

Format team info for prompts.

Source code in src/llmling_agent/delegation/team.py
88
89
90
91
92
def __prompt__(self) -> str:
    """Format team info for prompts."""
    members = ", ".join(a.name for a in self.agents)
    desc = f" - {self.description}" if self.description else ""
    return f"Parallel Team '{self.name}'{desc}\nMembers: {members}"

_run async

_run(
    *prompts: AnyPromptType | Image | PathLike[str] | None,
    wait_for_connections: bool | None = None,
    message_id: str | None = None,
    conversation_id: str | None = None,
    **kwargs: Any,
) -> ChatMessage[list[Any]]

Run all agents in parallel and return combined message.

Source code in src/llmling_agent/delegation/team.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
async def _run(
    self,
    *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
    wait_for_connections: bool | None = None,
    message_id: str | None = None,
    conversation_id: str | None = None,
    **kwargs: Any,
) -> ChatMessage[list[Any]]:
    """Run all agents in parallel and return combined message."""
    result: TeamResponse = await self.execute(*prompts, **kwargs)
    message_id = message_id or str(uuid4())
    return ChatMessage(
        content=[r.message.content for r in result if r.message],
        role="assistant",
        name=self.name,
        message_id=message_id,
        conversation_id=conversation_id,
        metadata={
            "agent_names": [r.agent_name for r in result],
            "errors": {name: str(error) for name, error in result.errors.items()},
            "start_time": result.start_time.isoformat(),
        },
    )

execute async

execute(
    *prompts: AnyPromptType | Image | PathLike[str] | None, **kwargs: Any
) -> TeamResponse

Run all agents in parallel with monitoring.

Source code in src/llmling_agent/delegation/team.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
async def execute(
    self,
    *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
    **kwargs: Any,
) -> TeamResponse:
    """Run all agents in parallel with monitoring."""
    from llmling_agent.talk.talk import Talk

    self._team_talk.clear()

    start_time = datetime.now()
    responses: list[AgentResponse[Any]] = []
    errors: dict[str, Exception] = {}
    final_prompt = list(prompts)
    if self.shared_prompt:
        final_prompt.insert(0, self.shared_prompt)
    combined_prompt = "\n".join([await to_prompt(p) for p in final_prompt])
    all_nodes = list(await self.pick_agents(combined_prompt))
    # Create Talk connections for monitoring this execution
    execution_talks: list[Talk[Any]] = []
    for node in all_nodes:
        talk = Talk[Any](
            node,
            [],  # No actual forwarding, just for tracking
            connection_type="run",
            queued=True,
            queue_strategy="latest",
        )
        execution_talks.append(talk)
        self._team_talk.append(talk)  # Add to base class's TeamTalk

    async def _run(node: MessageNode[TDeps, Any]):
        try:
            start = perf_counter()
            message = await node.run(*final_prompt, **kwargs)
            timing = perf_counter() - start
            r = AgentResponse(agent_name=node.name, message=message, timing=timing)
            responses.append(r)

            # Update talk stats for this agent
            talk = next(t for t in execution_talks if t.source == node)
            talk._stats.messages.append(message)

        except Exception as e:  # noqa: BLE001
            errors[node.name] = e

    # Run all agents in parallel
    await asyncio.gather(*[_run(node) for node in all_nodes])

    return TeamResponse(responses=responses, start_time=start_time, errors=errors)

run_iter async

run_iter(*prompts: AnyPromptType, **kwargs: Any) -> AsyncIterator[ChatMessage[Any]]

Yield messages as they arrive from parallel execution.

Source code in src/llmling_agent/delegation/team.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
async def run_iter(
    self,
    *prompts: AnyPromptType,
    **kwargs: Any,
) -> AsyncIterator[ChatMessage[Any]]:
    """Yield messages as they arrive from parallel execution."""
    queue: asyncio.Queue[ChatMessage[Any] | None] = asyncio.Queue()
    failures: dict[str, Exception] = {}

    async def _run(node: MessageNode[TDeps, Any]):
        try:
            message = await node.run(*prompts, **kwargs)
            await queue.put(message)
        except Exception as e:
            logger.exception("Error executing node %s", node.name)
            failures[node.name] = e
            # Put None to maintain queue count
            await queue.put(None)

    # Get nodes to run
    combined_prompt = "\n".join([await to_prompt(p) for p in prompts])
    all_nodes = list(await self.pick_agents(combined_prompt))

    # Start all agents
    tasks = [asyncio.create_task(_run(n), name=f"run_{n.name}") for n in all_nodes]

    try:
        # Yield messages as they arrive
        for _ in all_nodes:
            if msg := await queue.get():
                yield msg

        # If any failures occurred, raise error with details
        if failures:
            error_details = "\n".join(
                f"- {name}: {error}" for name, error in failures.items()
            )
            error_msg = f"Some nodes failed to execute:\n{error_details}"
            raise RuntimeError(error_msg)

    finally:
        # Clean up any remaining tasks
        for task in tasks:
            if not task.done():
                task.cancel()

run_job async

run_job(
    job: Job[TDeps, TJobResult],
    *,
    store_history: bool = True,
    include_agent_tools: bool = True,
) -> list[AgentResponse[TJobResult]]

Execute a job across all team members in parallel.

Parameters:

Name Type Description Default
job Job[TDeps, TJobResult]

Job configuration to execute

required
store_history bool

Whether to add job execution to conversation history

True
include_agent_tools bool

Whether to include agent's tools alongside job tools

True

Returns:

Type Description
list[AgentResponse[TJobResult]]

List of responses from all agents

Raises:

Type Description
JobError

If job execution fails for any agent

ValueError

If job configuration is invalid

Source code in src/llmling_agent/delegation/team.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
async def run_job[TJobResult](
    self,
    job: Job[TDeps, TJobResult],
    *,
    store_history: bool = True,
    include_agent_tools: bool = True,
) -> list[AgentResponse[TJobResult]]:
    """Execute a job across all team members in parallel.

    Args:
        job: Job configuration to execute
        store_history: Whether to add job execution to conversation history
        include_agent_tools: Whether to include agent's tools alongside job tools

    Returns:
        List of responses from all agents

    Raises:
        JobError: If job execution fails for any agent
        ValueError: If job configuration is invalid
    """
    from llmling_agent.agent import Agent, StructuredAgent
    from llmling_agent.tasks import JobError

    responses: list[AgentResponse[TJobResult]] = []
    errors: dict[str, Exception] = {}
    start_time = datetime.now()

    # Validate dependencies for all agents
    if job.required_dependency is not None:
        invalid_agents = [
            agent.name
            for agent in self.iter_agents()
            if not isinstance(agent.context.data, job.required_dependency)
        ]
        if invalid_agents:
            msg = (
                f"Agents {', '.join(invalid_agents)} don't have required "
                f"dependency type: {job.required_dependency}"
            )
            raise JobError(msg)

    try:
        # Load knowledge for all agents if provided
        if job.knowledge:
            # TODO: resources
            tools = [t.name for t in job.get_tools()]
            await self.distribute(content="", tools=tools)

        prompt = await job.get_prompt()

        async def _run(agent: MessageNode[TDeps, TJobResult]):
            assert isinstance(agent, Agent | StructuredAgent)
            try:
                with agent.tools.temporary_tools(
                    job.get_tools(), exclusive=not include_agent_tools
                ):
                    start = perf_counter()
                    resp = AgentResponse(
                        agent_name=agent.name,
                        message=await agent.run(prompt, store_history=store_history),  # pyright: ignore
                        timing=perf_counter() - start,
                    )
                    responses.append(resp)
            except Exception as e:  # noqa: BLE001
                errors[agent.name] = e

        # Run job in parallel on all agents
        await asyncio.gather(*[_run(node) for node in self.agents])

        return TeamResponse(responses=responses, start_time=start_time, errors=errors)

    except Exception as e:
        msg = "Job execution failed"
        logger.exception(msg)
        raise JobError(msg) from e

Show source on GitHub