Skip to content

AgentPool

Base classes

Name Children Inherits
BaseRegistry
llmling_agent.utils.baseregistry
Base class for registries providing item storage and change notifications.
Generic
typing
Abstract base class for generic types.

⋔ Inheritance diagram

graph TD
  94738104971088["pool.AgentPool"]
  94738094091664["baseregistry.BaseRegistry"]
  94738048501504["abc.MutableMapping"]
  94738048497536["abc.Mapping"]
  94738048491584["abc.Collection"]
  94738048489600["abc.Sized"]
  140588174154208["builtins.object"]
  94738048485632["abc.Iterable"]
  94738048490592["abc.Container"]
  94738048344736["abc.ABC"]
  94738047949568["typing.Generic"]
  94738094091664 --> 94738104971088
  94738048501504 --> 94738094091664
  94738048497536 --> 94738048501504
  94738048491584 --> 94738048497536
  94738048489600 --> 94738048491584
  140588174154208 --> 94738048489600
  94738048485632 --> 94738048491584
  140588174154208 --> 94738048485632
  94738048490592 --> 94738048491584
  140588174154208 --> 94738048490592
  94738048344736 --> 94738094091664
  140588174154208 --> 94738048344736
  94738047949568 --> 94738094091664
  140588174154208 --> 94738047949568
  94738047949568 --> 94738104971088

🛈 DocStrings

Bases: BaseRegistry[NodeName, MessageNode[Any, Any]]

Pool managing message processing nodes (agents and teams).

Acts as a unified registry for all nodes, providing: - Centralized node management and lookup - Shared dependency injection - Connection management - Resource coordination

Nodes can be accessed through: - nodes: All registered nodes (agents and teams) - agents: Only Agent instances - teams: Only Team instances

Source code in src/llmling_agent/delegation/pool.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
class AgentPool[TPoolDeps = None](BaseRegistry[NodeName, MessageNode[Any, Any]]):
    """Pool managing message processing nodes (agents and teams).

    Acts as a unified registry for all nodes, providing:
    - Centralized node management and lookup
    - Shared dependency injection
    - Connection management
    - Resource coordination

    Nodes can be accessed through:
    - nodes: All registered nodes (agents and teams)
    - agents: Only Agent instances
    - teams: Only Team instances
    """

    def __init__(
        self,
        manifest: JoinablePathLike | AgentsManifest | None = None,
        *,
        shared_deps: TPoolDeps | None = None,
        connect_nodes: bool = True,
        input_provider: InputProvider | None = None,
        parallel_load: bool = True,
        event_handlers: list[IndividualEventHandler] | None = None,
    ):
        """Initialize agent pool with immediate agent creation.

        Args:
            manifest: Agent configuration manifest
            shared_deps: Dependencies to share across all nodes
            connect_nodes: Whether to set up forwarding connections
            input_provider: Input provider for tool / step confirmations / HumanAgents
            parallel_load: Whether to load nodes in parallel (async)
            event_handlers: Event handlers to pass through to all agents

        Raises:
            ValueError: If manifest contains invalid node configurations
            RuntimeError: If node initialization fails
        """
        super().__init__()
        from llmling_agent.mcp_server.manager import MCPManager
        from llmling_agent.models.manifest import AgentsManifest
        from llmling_agent.observability import registry
        from llmling_agent.skills.manager import SkillsManager
        from llmling_agent.storage import StorageManager

        match manifest:
            case None:
                self.manifest = AgentsManifest()
            case str() | os.PathLike() | UPath():
                self.manifest = AgentsManifest.from_file(manifest)
            case AgentsManifest():
                self.manifest = manifest
            case _:
                msg = f"Invalid config path: {manifest}"
                raise ValueError(msg)

        registry.configure_observability(self.manifest.observability)
        self.shared_deps = shared_deps
        self._input_provider = input_provider
        self.exit_stack = AsyncExitStack()
        self.parallel_load = parallel_load
        self.storage = StorageManager(self.manifest.storage)
        self.event_handlers = event_handlers or []

        self.connection_registry = ConnectionRegistry()
        servers = self.manifest.get_mcp_servers()
        self.mcp = MCPManager(name="pool_mcp", servers=servers, owner="pool")
        self.skills = SkillsManager(name="pool_skills", owner="pool")
        self._tasks = TaskRegistry()
        # Register tasks from manifest
        for name, task in self.manifest.jobs.items():
            self._tasks.register(name, task)
        self.process_manager = ProcessManager()
        self.pool_talk = TeamTalk[Any].from_nodes(list(self.nodes.values()))
        # MCP server is now managed externally
        self.server = None
        # Create requested agents immediately
        for name in self.manifest.agents:
            agent = self.manifest.get_agent(
                name,
                deps=shared_deps,
                input_provider=self._input_provider,
                pool=self,
                event_handlers=self.event_handlers,
            )
            self.register(name, agent)

        # Then set up worker relationships
        for agent in self.agents.values():
            self.setup_agent_workers(agent)
        self._create_teams()
        # Set up forwarding connections
        if connect_nodes:
            self._connect_nodes()

        self._enter_lock = Lock()  # Initialize async safety fields
        self._running_count = 0

    async def __aenter__(self) -> Self:
        """Enter async context and initialize all agents."""
        async with self._enter_lock:
            if self._running_count == 0:
                try:
                    # Initialize MCP manager first, then add aggregating provider
                    await self.exit_stack.enter_async_context(self.mcp)
                    await self.exit_stack.enter_async_context(self.skills)
                    aggregating_provider = self.mcp.get_aggregating_provider()
                    skills_provider = self.skills.get_skills_provider()

                    agents = list(self.agents.values())
                    teams = list(self.teams.values())
                    for agent in agents:
                        agent.tools.add_provider(aggregating_provider)
                        agent.tools.add_provider(skills_provider)

                    # Collect remaining components to initialize (MCP already initialized)
                    components: list[AbstractAsyncContextManager[Any]] = [
                        self.storage,
                        *agents,
                        *teams,
                    ]

                    # MCP server is now managed externally - removed from pool
                    # Initialize all components
                    if self.parallel_load:
                        await asyncio.gather(
                            *(self.exit_stack.enter_async_context(c) for c in components)
                        )
                    else:
                        for component in components:
                            await self.exit_stack.enter_async_context(component)

                except Exception as e:
                    await self.cleanup()
                    msg = "Failed to initialize agent pool"
                    logger.exception(msg, exc_info=e)
                    raise RuntimeError(msg) from e
            self._running_count += 1
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Exit async context."""
        if self._running_count == 0:
            msg = "AgentPool.__aexit__ called more times than __aenter__"
            raise ValueError(msg)
        async with self._enter_lock:
            self._running_count -= 1
            if self._running_count == 0:
                # Remove MCP aggregating provider from all agents
                aggregating_provider = self.mcp.get_aggregating_provider()
                skills_provider = self.skills.get_skills_provider()
                for agent in self.agents.values():
                    agent.tools.remove_provider(aggregating_provider.name)
                    agent.tools.remove_provider(skills_provider.name)
                await self.cleanup()

    @property
    def is_running(self) -> bool:
        """Check if the agent pool is running."""
        return bool(self._running_count)

    async def cleanup(self) -> None:
        """Clean up all agents."""
        # Clean up background processes first
        await self.process_manager.cleanup()
        await self.exit_stack.aclose()
        self.clear()

    @overload
    def create_team_run[TResult](
        self,
        agents: Sequence[str],
        validator: MessageNode[Any, TResult] | None = None,
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        picker: Agent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
    ) -> TeamRun[TPoolDeps, TResult]: ...

    @overload
    def create_team_run[TDeps, TResult](
        self,
        agents: Sequence[MessageNode[TDeps, Any]],
        validator: MessageNode[Any, TResult] | None = None,
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        picker: Agent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
    ) -> TeamRun[TDeps, TResult]: ...

    @overload
    def create_team_run[TResult](
        self,
        agents: Sequence[AgentName | MessageNode[Any, Any]],
        validator: MessageNode[Any, TResult] | None = None,
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        picker: Agent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
    ) -> TeamRun[Any, TResult]: ...

    def create_team_run[TResult](
        self,
        agents: Sequence[AgentName | MessageNode[Any, Any]] | None = None,
        validator: MessageNode[Any, TResult] | None = None,
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        picker: Agent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
    ) -> TeamRun[Any, TResult]:
        """Create a a sequential TeamRun from a list of Agents.

        Args:
            agents: List of agent names or team/agent instances (all if None)
            validator: Node to validate the results of the TeamRun
            name: Optional name for the team
            description: Optional description for the team
            shared_prompt: Optional prompt for all agents
            picker: Agent to use for picking agents
            num_picks: Number of agents to pick
            pick_prompt: Prompt to use for picking agents
        """
        from llmling_agent.delegation.teamrun import TeamRun

        if agents is None:
            agents = list(self.agents.keys())

        # First resolve/configure agents
        resolved_agents: list[MessageNode[Any, Any]] = []
        for agent in agents:
            if isinstance(agent, str):
                agent = self.get_agent(agent)
            resolved_agents.append(agent)
        team = TeamRun(
            resolved_agents,
            name=name,
            description=description,
            validator=validator,
            shared_prompt=shared_prompt,
            picker=picker,
            num_picks=num_picks,
            pick_prompt=pick_prompt,
        )
        if name:
            self[name] = team
        return team

    @overload
    def create_team(self, agents: Sequence[str]) -> Team[TPoolDeps]: ...

    @overload
    def create_team[TDeps](
        self,
        agents: Sequence[MessageNode[TDeps, Any]],
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        picker: Agent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
    ) -> Team[TDeps]: ...

    @overload
    def create_team(
        self,
        agents: Sequence[AgentName | MessageNode[Any, Any]],
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        picker: Agent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
    ) -> Team[Any]: ...

    def create_team(
        self,
        agents: Sequence[AgentName | MessageNode[Any, Any]] | None = None,
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        picker: Agent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
    ) -> Team[Any]:
        """Create a group from agent names or instances.

        Args:
            agents: List of agent names or instances (all if None)
            name: Optional name for the team
            description: Optional description for the team
            shared_prompt: Optional prompt for all agents
            picker: Agent to use for picking agents
            num_picks: Number of agents to pick
            pick_prompt: Prompt to use for picking agents
        """
        from llmling_agent.delegation.team import Team

        if agents is None:
            agents = list(self.agents.keys())

        resolved_agents = [self.get_agent(i) if isinstance(i, str) else i for i in agents]
        team = Team(
            name=name,
            description=description,
            agents=resolved_agents,
            shared_prompt=shared_prompt,
            picker=picker,
            num_picks=num_picks,
            pick_prompt=pick_prompt,
        )
        if name:
            self[name] = team
        return team

    @asynccontextmanager
    async def track_message_flow(self) -> AsyncIterator[MessageFlowTracker]:
        """Track message flow during a context."""
        tracker = MessageFlowTracker()
        self.connection_registry.message_flow.connect(tracker.track)
        try:
            yield tracker
        finally:
            self.connection_registry.message_flow.disconnect(tracker.track)

    async def run_event_loop(self) -> None:
        """Run pool in event-watching mode until interrupted."""
        print("Starting event watch mode...")
        print("Active nodes: ", ", ".join(self.list_nodes()))
        print("Press Ctrl+C to stop")

        with suppress(KeyboardInterrupt):
            while True:
                await asyncio.sleep(1)

    @property
    def agents(self) -> dict[str, Agent[Any, Any]]:
        """Get agents dict (backward compatibility)."""
        return {i.name: i for i in self._items.values() if isinstance(i, Agent)}

    @property
    def teams(self) -> dict[str, BaseTeam[Any, Any]]:
        """Get agents dict (backward compatibility)."""
        from llmling_agent.delegation.base_team import BaseTeam

        return {i.name: i for i in self._items.values() if isinstance(i, BaseTeam)}

    @property
    def nodes(self) -> dict[str, MessageNode[Any, Any]]:
        """Get agents dict (backward compatibility)."""
        from llmling_agent import MessageNode

        return {i.name: i for i in self._items.values() if isinstance(i, MessageNode)}

    @property
    def node_events(self) -> DictEvents:
        """Get node events."""
        return self._items.events

    def _validate_item(self, item: MessageNode[Any, Any] | Any) -> MessageNode[Any, Any]:
        """Validate and convert items before registration.

        Args:
            item: Item to validate

        Returns:
            Validated Node

        Raises:
            LLMlingError: If item is not a valid node
        """
        if not isinstance(item, MessageNode):
            msg = f"Item must be Agent or Team, got {type(item)}"
            raise self._error_class(msg)
        item.context.pool = self
        return item

    def _create_teams(self) -> None:
        """Create all teams in two phases to allow nesting."""
        # Phase 1: Create empty teams

        empty_teams: dict[str, BaseTeam[Any, Any]] = {}
        for name, config in self.manifest.teams.items():
            if config.mode == "parallel":
                empty_teams[name] = Team([], name=name, shared_prompt=config.shared_prompt)
            else:
                empty_teams[name] = TeamRun([], name=name, shared_prompt=config.shared_prompt)

        # Phase 2: Resolve members
        for name, config in self.manifest.teams.items():
            team = empty_teams[name]
            members: list[MessageNode[Any, Any]] = []
            for member in config.members:
                if member in self.agents:
                    members.append(self.agents[member])
                elif member in empty_teams:
                    members.append(empty_teams[member])
                else:
                    msg = f"Unknown team member: {member}"
                    raise ValueError(msg)
            team.agents.extend(members)
            self[name] = team

    def _connect_nodes(self) -> None:
        """Set up connections defined in manifest."""
        # Merge agent and team configs into one dict of nodes with connections
        for name, config in self.manifest.nodes.items():
            source = self[name]
            for target in config.connections or []:
                match target:
                    case NodeConnectionConfig(name=name_):
                        if name_ not in self:
                            msg = f"Forward target {name_} not found for {name}"
                            raise ValueError(msg)
                        target_node = self[name_]
                    case FileConnectionConfig(path=path_obj):
                        name = f"file_writer_{UPath(path_obj).stem}"
                        target_node = Agent(model=target.get_model(), name=name)
                    case CallableConnectionConfig(callable=fn):
                        target_node = Agent(model=target.get_model(), name=fn.__name__)
                    case _:
                        msg = f"Invalid connection config: {target}"
                        raise ValueError(msg)

                source.connect_to(
                    target_node,
                    connection_type=target.connection_type,
                    name=name,
                    priority=target.priority,
                    delay=target.delay,
                    queued=target.queued,
                    queue_strategy=target.queue_strategy,
                    transform=target.transform,
                    filter_condition=target.filter_condition.check
                    if target.filter_condition
                    else None,
                    stop_condition=target.stop_condition.check if target.stop_condition else None,
                    exit_condition=target.exit_condition.check if target.exit_condition else None,
                )
                source.connections.set_wait_state(
                    target_node,
                    wait=target.wait_for_completion,
                )

    def setup_agent_workers(self, agent: Agent[Any, Any]) -> None:
        """Set up workers for an agent from configuration."""
        for worker_config in agent.context.config.workers:
            try:
                worker = self.nodes[worker_config.name]
                match worker_config:
                    case TeamWorkerConfig():
                        agent.register_worker(worker)
                    case AgentWorkerConfig():
                        agent.register_worker(
                            worker,
                            reset_history_on_run=worker_config.reset_history_on_run,
                            pass_message_history=worker_config.pass_message_history,
                        )
            except KeyError as e:
                msg = f"Worker agent {worker_config.name!r} not found"
                raise ValueError(msg) from e

    @overload
    def get_agent[TResult = str](
        self,
        agent: AgentName | Agent[Any, str],
        *,
        return_type: type[TResult] = str,  # type: ignore
        model_override: str | None = None,
        session: SessionIdType | SessionQuery = None,
    ) -> Agent[TPoolDeps, TResult]: ...

    @overload
    def get_agent[TCustomDeps, TResult = str](
        self,
        agent: AgentName | Agent[Any, str],
        *,
        deps_type: type[TCustomDeps],
        return_type: type[TResult] = str,  # type: ignore
        model_override: str | None = None,
        session: SessionIdType | SessionQuery = None,
    ) -> Agent[TCustomDeps, TResult]: ...

    def get_agent(
        self,
        agent: AgentName | Agent[Any, str],
        *,
        deps_type: Any | None = None,
        return_type: Any = str,
        model_override: str | None = None,
        session: SessionIdType | SessionQuery = None,
    ) -> Agent[Any, Any]:
        """Get or configure an agent from the pool.

        This method provides flexible agent configuration with dependency injection:
        - Without deps: Agent uses pool's shared dependencies
        - With deps: Agent uses provided custom dependencies

        Args:
            agent: Either agent name or instance
            deps_type: Optional custom dependencies type (overrides shared deps)
            return_type: Optional type for structured responses
            model_override: Optional model override
            session: Optional session ID or query to recover conversation

        Returns:
            Either:
            - Agent[TPoolDeps] when using pool's shared deps
            - Agent[TCustomDeps] when custom deps provided

        Raises:
            KeyError: If agent name not found
            ValueError: If configuration is invalid
        """
        from llmling_agent.agent import Agent

        base = agent if isinstance(agent, Agent) else self.agents[agent]
        # Use custom deps if provided, otherwise use shared deps
        # base.context.data = deps if deps is not None else self.shared_deps
        base.deps_type = deps_type
        base.context.pool = self
        if model_override:
            base.set_model(model_override)
        if session:
            base.conversation.load_history_from_database(session=session)
        if return_type not in {str, None}:
            base.to_structured(return_type)

        return base

    def list_nodes(self) -> list[str]:
        """List available agent names."""
        return list(self.list_items())

    def get_job(self, name: str) -> Job[Any, Any]:
        return self._tasks[name]

    def register_task(self, name: str, task: Job[Any, Any]) -> None:
        self._tasks.register(name, task)

    async def add_agent[TResult = str](
        self,
        name: AgentName,
        *,
        output_type: OutputSpec[TResult] = str,  # type: ignore[assignment]
        **kwargs: Unpack[AgentKwargs],
    ) -> Agent[Any, TResult]:
        """Add a new permanent agent to the pool.

        Args:
            name: Name for the new agent
            output_type: Optional type for structured responses:
            **kwargs: Additional agent configuration

        Returns:
            An agent instance
        """
        from llmling_agent.agent import Agent

        if not kwargs.get("event_handlers"):
            kwargs["event_handlers"] = self.event_handlers
        agent: Agent[Any, TResult] = Agent(
            name=name,
            **kwargs,
            output_type=output_type,
            agent_pool=self,
        )
        # Add MCP aggregating provider from manager
        agent.tools.add_provider(self.mcp.get_aggregating_provider())
        agent.tools.add_provider(self.skills.get_skills_provider())
        agent = await self.exit_stack.enter_async_context(agent)
        self.register(name, agent)
        return agent

    def get_mermaid_diagram(self, include_details: bool = True) -> str:
        """Generate mermaid flowchart of all agents and their connections.

        Args:
            include_details: Whether to show connection details (types, queues, etc)
        """
        lines = ["flowchart LR"]

        # Add all agents as nodes
        for name in self.agents:
            lines.append(f"    {name}[{name}]")  # noqa: PERF401

        # Add all connections as edges
        for agent in self.agents.values():
            connections = agent.connections.get_connections()
            for talk in connections:
                source = talk.source.name
                for target in talk.targets:
                    if include_details:
                        details: list[str] = []
                        details.append(talk.connection_type)
                        if talk.queued:
                            details.append(f"queued({talk.queue_strategy})")
                        if fn := talk.filter_condition:
                            details.append(f"filter:{fn.__name__}")
                        if fn := talk.stop_condition:
                            details.append(f"stop:{fn.__name__}")
                        if fn := talk.exit_condition:
                            details.append(f"exit:{fn.__name__}")

                        label = f"|{' '.join(details)}|" if details else ""
                        lines.append(f"    {source}--{label}-->{target.name}")
                    else:
                        lines.append(f"    {source}-->{target.name}")

        return "\n".join(lines)

agents property

agents: dict[str, Agent[Any, Any]]

Get agents dict (backward compatibility).

is_running property

is_running: bool

Check if the agent pool is running.

node_events property

node_events: DictEvents

Get node events.

nodes property

nodes: dict[str, MessageNode[Any, Any]]

Get agents dict (backward compatibility).

teams property

teams: dict[str, BaseTeam[Any, Any]]

Get agents dict (backward compatibility).

__aenter__ async

__aenter__() -> Self

Enter async context and initialize all agents.

Source code in src/llmling_agent/delegation/pool.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
async def __aenter__(self) -> Self:
    """Enter async context and initialize all agents."""
    async with self._enter_lock:
        if self._running_count == 0:
            try:
                # Initialize MCP manager first, then add aggregating provider
                await self.exit_stack.enter_async_context(self.mcp)
                await self.exit_stack.enter_async_context(self.skills)
                aggregating_provider = self.mcp.get_aggregating_provider()
                skills_provider = self.skills.get_skills_provider()

                agents = list(self.agents.values())
                teams = list(self.teams.values())
                for agent in agents:
                    agent.tools.add_provider(aggregating_provider)
                    agent.tools.add_provider(skills_provider)

                # Collect remaining components to initialize (MCP already initialized)
                components: list[AbstractAsyncContextManager[Any]] = [
                    self.storage,
                    *agents,
                    *teams,
                ]

                # MCP server is now managed externally - removed from pool
                # Initialize all components
                if self.parallel_load:
                    await asyncio.gather(
                        *(self.exit_stack.enter_async_context(c) for c in components)
                    )
                else:
                    for component in components:
                        await self.exit_stack.enter_async_context(component)

            except Exception as e:
                await self.cleanup()
                msg = "Failed to initialize agent pool"
                logger.exception(msg, exc_info=e)
                raise RuntimeError(msg) from e
        self._running_count += 1
    return self

__aexit__ async

__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit async context.

Source code in src/llmling_agent/delegation/pool.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit async context."""
    if self._running_count == 0:
        msg = "AgentPool.__aexit__ called more times than __aenter__"
        raise ValueError(msg)
    async with self._enter_lock:
        self._running_count -= 1
        if self._running_count == 0:
            # Remove MCP aggregating provider from all agents
            aggregating_provider = self.mcp.get_aggregating_provider()
            skills_provider = self.skills.get_skills_provider()
            for agent in self.agents.values():
                agent.tools.remove_provider(aggregating_provider.name)
                agent.tools.remove_provider(skills_provider.name)
            await self.cleanup()

__init__

__init__(
    manifest: JoinablePathLike | AgentsManifest | None = None,
    *,
    shared_deps: TPoolDeps | None = None,
    connect_nodes: bool = True,
    input_provider: InputProvider | None = None,
    parallel_load: bool = True,
    event_handlers: list[IndividualEventHandler] | None = None
)

Initialize agent pool with immediate agent creation.

Parameters:

Name Type Description Default
manifest JoinablePathLike | AgentsManifest | None

Agent configuration manifest

None
shared_deps TPoolDeps | None

Dependencies to share across all nodes

None
connect_nodes bool

Whether to set up forwarding connections

True
input_provider InputProvider | None

Input provider for tool / step confirmations / HumanAgents

None
parallel_load bool

Whether to load nodes in parallel (async)

True
event_handlers list[IndividualEventHandler] | None

Event handlers to pass through to all agents

None

Raises:

Type Description
ValueError

If manifest contains invalid node configurations

RuntimeError

If node initialization fails

Source code in src/llmling_agent/delegation/pool.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def __init__(
    self,
    manifest: JoinablePathLike | AgentsManifest | None = None,
    *,
    shared_deps: TPoolDeps | None = None,
    connect_nodes: bool = True,
    input_provider: InputProvider | None = None,
    parallel_load: bool = True,
    event_handlers: list[IndividualEventHandler] | None = None,
):
    """Initialize agent pool with immediate agent creation.

    Args:
        manifest: Agent configuration manifest
        shared_deps: Dependencies to share across all nodes
        connect_nodes: Whether to set up forwarding connections
        input_provider: Input provider for tool / step confirmations / HumanAgents
        parallel_load: Whether to load nodes in parallel (async)
        event_handlers: Event handlers to pass through to all agents

    Raises:
        ValueError: If manifest contains invalid node configurations
        RuntimeError: If node initialization fails
    """
    super().__init__()
    from llmling_agent.mcp_server.manager import MCPManager
    from llmling_agent.models.manifest import AgentsManifest
    from llmling_agent.observability import registry
    from llmling_agent.skills.manager import SkillsManager
    from llmling_agent.storage import StorageManager

    match manifest:
        case None:
            self.manifest = AgentsManifest()
        case str() | os.PathLike() | UPath():
            self.manifest = AgentsManifest.from_file(manifest)
        case AgentsManifest():
            self.manifest = manifest
        case _:
            msg = f"Invalid config path: {manifest}"
            raise ValueError(msg)

    registry.configure_observability(self.manifest.observability)
    self.shared_deps = shared_deps
    self._input_provider = input_provider
    self.exit_stack = AsyncExitStack()
    self.parallel_load = parallel_load
    self.storage = StorageManager(self.manifest.storage)
    self.event_handlers = event_handlers or []

    self.connection_registry = ConnectionRegistry()
    servers = self.manifest.get_mcp_servers()
    self.mcp = MCPManager(name="pool_mcp", servers=servers, owner="pool")
    self.skills = SkillsManager(name="pool_skills", owner="pool")
    self._tasks = TaskRegistry()
    # Register tasks from manifest
    for name, task in self.manifest.jobs.items():
        self._tasks.register(name, task)
    self.process_manager = ProcessManager()
    self.pool_talk = TeamTalk[Any].from_nodes(list(self.nodes.values()))
    # MCP server is now managed externally
    self.server = None
    # Create requested agents immediately
    for name in self.manifest.agents:
        agent = self.manifest.get_agent(
            name,
            deps=shared_deps,
            input_provider=self._input_provider,
            pool=self,
            event_handlers=self.event_handlers,
        )
        self.register(name, agent)

    # Then set up worker relationships
    for agent in self.agents.values():
        self.setup_agent_workers(agent)
    self._create_teams()
    # Set up forwarding connections
    if connect_nodes:
        self._connect_nodes()

    self._enter_lock = Lock()  # Initialize async safety fields
    self._running_count = 0

add_agent async

add_agent(
    name: AgentName,
    *,
    output_type: OutputSpec[TResult] = str,
    **kwargs: Unpack[AgentKwargs]
) -> Agent[Any, TResult]

Add a new permanent agent to the pool.

Parameters:

Name Type Description Default
name AgentName

Name for the new agent

required
output_type OutputSpec[TResult]

Optional type for structured responses:

str
**kwargs Unpack[AgentKwargs]

Additional agent configuration

{}

Returns:

Type Description
Agent[Any, TResult]

An agent instance

Source code in src/llmling_agent/delegation/pool.py
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
async def add_agent[TResult = str](
    self,
    name: AgentName,
    *,
    output_type: OutputSpec[TResult] = str,  # type: ignore[assignment]
    **kwargs: Unpack[AgentKwargs],
) -> Agent[Any, TResult]:
    """Add a new permanent agent to the pool.

    Args:
        name: Name for the new agent
        output_type: Optional type for structured responses:
        **kwargs: Additional agent configuration

    Returns:
        An agent instance
    """
    from llmling_agent.agent import Agent

    if not kwargs.get("event_handlers"):
        kwargs["event_handlers"] = self.event_handlers
    agent: Agent[Any, TResult] = Agent(
        name=name,
        **kwargs,
        output_type=output_type,
        agent_pool=self,
    )
    # Add MCP aggregating provider from manager
    agent.tools.add_provider(self.mcp.get_aggregating_provider())
    agent.tools.add_provider(self.skills.get_skills_provider())
    agent = await self.exit_stack.enter_async_context(agent)
    self.register(name, agent)
    return agent

cleanup async

cleanup() -> None

Clean up all agents.

Source code in src/llmling_agent/delegation/pool.py
225
226
227
228
229
230
async def cleanup(self) -> None:
    """Clean up all agents."""
    # Clean up background processes first
    await self.process_manager.cleanup()
    await self.exit_stack.aclose()
    self.clear()

create_team

create_team(agents: Sequence[str]) -> Team[TPoolDeps]
create_team(
    agents: Sequence[MessageNode[TDeps, Any]],
    *,
    name: str | None = None,
    description: str | None = None,
    shared_prompt: str | None = None,
    picker: Agent[Any, Any] | None = None,
    num_picks: int | None = None,
    pick_prompt: str | None = None
) -> Team[TDeps]
create_team(
    agents: Sequence[AgentName | MessageNode[Any, Any]],
    *,
    name: str | None = None,
    description: str | None = None,
    shared_prompt: str | None = None,
    picker: Agent[Any, Any] | None = None,
    num_picks: int | None = None,
    pick_prompt: str | None = None
) -> Team[Any]
create_team(
    agents: Sequence[AgentName | MessageNode[Any, Any]] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    shared_prompt: str | None = None,
    picker: Agent[Any, Any] | None = None,
    num_picks: int | None = None,
    pick_prompt: str | None = None
) -> Team[Any]

Create a group from agent names or instances.

Parameters:

Name Type Description Default
agents Sequence[AgentName | MessageNode[Any, Any]] | None

List of agent names or instances (all if None)

None
name str | None

Optional name for the team

None
description str | None

Optional description for the team

None
shared_prompt str | None

Optional prompt for all agents

None
picker Agent[Any, Any] | None

Agent to use for picking agents

None
num_picks int | None

Number of agents to pick

None
pick_prompt str | None

Prompt to use for picking agents

None
Source code in src/llmling_agent/delegation/pool.py
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
def create_team(
    self,
    agents: Sequence[AgentName | MessageNode[Any, Any]] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    shared_prompt: str | None = None,
    picker: Agent[Any, Any] | None = None,
    num_picks: int | None = None,
    pick_prompt: str | None = None,
) -> Team[Any]:
    """Create a group from agent names or instances.

    Args:
        agents: List of agent names or instances (all if None)
        name: Optional name for the team
        description: Optional description for the team
        shared_prompt: Optional prompt for all agents
        picker: Agent to use for picking agents
        num_picks: Number of agents to pick
        pick_prompt: Prompt to use for picking agents
    """
    from llmling_agent.delegation.team import Team

    if agents is None:
        agents = list(self.agents.keys())

    resolved_agents = [self.get_agent(i) if isinstance(i, str) else i for i in agents]
    team = Team(
        name=name,
        description=description,
        agents=resolved_agents,
        shared_prompt=shared_prompt,
        picker=picker,
        num_picks=num_picks,
        pick_prompt=pick_prompt,
    )
    if name:
        self[name] = team
    return team

create_team_run

create_team_run(
    agents: Sequence[str],
    validator: MessageNode[Any, TResult] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    shared_prompt: str | None = None,
    picker: Agent[Any, Any] | None = None,
    num_picks: int | None = None,
    pick_prompt: str | None = None
) -> TeamRun[TPoolDeps, TResult]
create_team_run(
    agents: Sequence[MessageNode[TDeps, Any]],
    validator: MessageNode[Any, TResult] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    shared_prompt: str | None = None,
    picker: Agent[Any, Any] | None = None,
    num_picks: int | None = None,
    pick_prompt: str | None = None
) -> TeamRun[TDeps, TResult]
create_team_run(
    agents: Sequence[AgentName | MessageNode[Any, Any]],
    validator: MessageNode[Any, TResult] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    shared_prompt: str | None = None,
    picker: Agent[Any, Any] | None = None,
    num_picks: int | None = None,
    pick_prompt: str | None = None
) -> TeamRun[Any, TResult]
create_team_run(
    agents: Sequence[AgentName | MessageNode[Any, Any]] | None = None,
    validator: MessageNode[Any, TResult] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    shared_prompt: str | None = None,
    picker: Agent[Any, Any] | None = None,
    num_picks: int | None = None,
    pick_prompt: str | None = None
) -> TeamRun[Any, TResult]

Create a a sequential TeamRun from a list of Agents.

Parameters:

Name Type Description Default
agents Sequence[AgentName | MessageNode[Any, Any]] | None

List of agent names or team/agent instances (all if None)

None
validator MessageNode[Any, TResult] | None

Node to validate the results of the TeamRun

None
name str | None

Optional name for the team

None
description str | None

Optional description for the team

None
shared_prompt str | None

Optional prompt for all agents

None
picker Agent[Any, Any] | None

Agent to use for picking agents

None
num_picks int | None

Number of agents to pick

None
pick_prompt str | None

Prompt to use for picking agents

None
Source code in src/llmling_agent/delegation/pool.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def create_team_run[TResult](
    self,
    agents: Sequence[AgentName | MessageNode[Any, Any]] | None = None,
    validator: MessageNode[Any, TResult] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    shared_prompt: str | None = None,
    picker: Agent[Any, Any] | None = None,
    num_picks: int | None = None,
    pick_prompt: str | None = None,
) -> TeamRun[Any, TResult]:
    """Create a a sequential TeamRun from a list of Agents.

    Args:
        agents: List of agent names or team/agent instances (all if None)
        validator: Node to validate the results of the TeamRun
        name: Optional name for the team
        description: Optional description for the team
        shared_prompt: Optional prompt for all agents
        picker: Agent to use for picking agents
        num_picks: Number of agents to pick
        pick_prompt: Prompt to use for picking agents
    """
    from llmling_agent.delegation.teamrun import TeamRun

    if agents is None:
        agents = list(self.agents.keys())

    # First resolve/configure agents
    resolved_agents: list[MessageNode[Any, Any]] = []
    for agent in agents:
        if isinstance(agent, str):
            agent = self.get_agent(agent)
        resolved_agents.append(agent)
    team = TeamRun(
        resolved_agents,
        name=name,
        description=description,
        validator=validator,
        shared_prompt=shared_prompt,
        picker=picker,
        num_picks=num_picks,
        pick_prompt=pick_prompt,
    )
    if name:
        self[name] = team
    return team

get_agent

get_agent(
    agent: AgentName | Agent[Any, str],
    *,
    return_type: type[TResult] = str,
    model_override: str | None = None,
    session: SessionIdType | SessionQuery = None
) -> Agent[TPoolDeps, TResult]
get_agent(
    agent: AgentName | Agent[Any, str],
    *,
    deps_type: type[TCustomDeps],
    return_type: type[TResult] = str,
    model_override: str | None = None,
    session: SessionIdType | SessionQuery = None
) -> Agent[TCustomDeps, TResult]
get_agent(
    agent: AgentName | Agent[Any, str],
    *,
    deps_type: Any | None = None,
    return_type: Any = str,
    model_override: str | None = None,
    session: SessionIdType | SessionQuery = None
) -> Agent[Any, Any]

Get or configure an agent from the pool.

This method provides flexible agent configuration with dependency injection: - Without deps: Agent uses pool's shared dependencies - With deps: Agent uses provided custom dependencies

Parameters:

Name Type Description Default
agent AgentName | Agent[Any, str]

Either agent name or instance

required
deps_type Any | None

Optional custom dependencies type (overrides shared deps)

None
return_type Any

Optional type for structured responses

str
model_override str | None

Optional model override

None
session SessionIdType | SessionQuery

Optional session ID or query to recover conversation

None

Returns:

Name Type Description
Either Agent[Any, Any]
Agent[Any, Any]
  • Agent[TPoolDeps] when using pool's shared deps
Agent[Any, Any]
  • Agent[TCustomDeps] when custom deps provided

Raises:

Type Description
KeyError

If agent name not found

ValueError

If configuration is invalid

Source code in src/llmling_agent/delegation/pool.py
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
def get_agent(
    self,
    agent: AgentName | Agent[Any, str],
    *,
    deps_type: Any | None = None,
    return_type: Any = str,
    model_override: str | None = None,
    session: SessionIdType | SessionQuery = None,
) -> Agent[Any, Any]:
    """Get or configure an agent from the pool.

    This method provides flexible agent configuration with dependency injection:
    - Without deps: Agent uses pool's shared dependencies
    - With deps: Agent uses provided custom dependencies

    Args:
        agent: Either agent name or instance
        deps_type: Optional custom dependencies type (overrides shared deps)
        return_type: Optional type for structured responses
        model_override: Optional model override
        session: Optional session ID or query to recover conversation

    Returns:
        Either:
        - Agent[TPoolDeps] when using pool's shared deps
        - Agent[TCustomDeps] when custom deps provided

    Raises:
        KeyError: If agent name not found
        ValueError: If configuration is invalid
    """
    from llmling_agent.agent import Agent

    base = agent if isinstance(agent, Agent) else self.agents[agent]
    # Use custom deps if provided, otherwise use shared deps
    # base.context.data = deps if deps is not None else self.shared_deps
    base.deps_type = deps_type
    base.context.pool = self
    if model_override:
        base.set_model(model_override)
    if session:
        base.conversation.load_history_from_database(session=session)
    if return_type not in {str, None}:
        base.to_structured(return_type)

    return base

get_mermaid_diagram

get_mermaid_diagram(include_details: bool = True) -> str

Generate mermaid flowchart of all agents and their connections.

Parameters:

Name Type Description Default
include_details bool

Whether to show connection details (types, queues, etc)

True
Source code in src/llmling_agent/delegation/pool.py
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
def get_mermaid_diagram(self, include_details: bool = True) -> str:
    """Generate mermaid flowchart of all agents and their connections.

    Args:
        include_details: Whether to show connection details (types, queues, etc)
    """
    lines = ["flowchart LR"]

    # Add all agents as nodes
    for name in self.agents:
        lines.append(f"    {name}[{name}]")  # noqa: PERF401

    # Add all connections as edges
    for agent in self.agents.values():
        connections = agent.connections.get_connections()
        for talk in connections:
            source = talk.source.name
            for target in talk.targets:
                if include_details:
                    details: list[str] = []
                    details.append(talk.connection_type)
                    if talk.queued:
                        details.append(f"queued({talk.queue_strategy})")
                    if fn := talk.filter_condition:
                        details.append(f"filter:{fn.__name__}")
                    if fn := talk.stop_condition:
                        details.append(f"stop:{fn.__name__}")
                    if fn := talk.exit_condition:
                        details.append(f"exit:{fn.__name__}")

                    label = f"|{' '.join(details)}|" if details else ""
                    lines.append(f"    {source}--{label}-->{target.name}")
                else:
                    lines.append(f"    {source}-->{target.name}")

    return "\n".join(lines)

list_nodes

list_nodes() -> list[str]

List available agent names.

Source code in src/llmling_agent/delegation/pool.py
608
609
610
def list_nodes(self) -> list[str]:
    """List available agent names."""
    return list(self.list_items())

run_event_loop async

run_event_loop() -> None

Run pool in event-watching mode until interrupted.

Source code in src/llmling_agent/delegation/pool.py
403
404
405
406
407
408
409
410
411
async def run_event_loop(self) -> None:
    """Run pool in event-watching mode until interrupted."""
    print("Starting event watch mode...")
    print("Active nodes: ", ", ".join(self.list_nodes()))
    print("Press Ctrl+C to stop")

    with suppress(KeyboardInterrupt):
        while True:
            await asyncio.sleep(1)

setup_agent_workers

setup_agent_workers(agent: Agent[Any, Any]) -> None

Set up workers for an agent from configuration.

Source code in src/llmling_agent/delegation/pool.py
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
def setup_agent_workers(self, agent: Agent[Any, Any]) -> None:
    """Set up workers for an agent from configuration."""
    for worker_config in agent.context.config.workers:
        try:
            worker = self.nodes[worker_config.name]
            match worker_config:
                case TeamWorkerConfig():
                    agent.register_worker(worker)
                case AgentWorkerConfig():
                    agent.register_worker(
                        worker,
                        reset_history_on_run=worker_config.reset_history_on_run,
                        pass_message_history=worker_config.pass_message_history,
                    )
        except KeyError as e:
            msg = f"Worker agent {worker_config.name!r} not found"
            raise ValueError(msg) from e

track_message_flow async

track_message_flow() -> AsyncIterator[MessageFlowTracker]

Track message flow during a context.

Source code in src/llmling_agent/delegation/pool.py
393
394
395
396
397
398
399
400
401
@asynccontextmanager
async def track_message_flow(self) -> AsyncIterator[MessageFlowTracker]:
    """Track message flow during a context."""
    tracker = MessageFlowTracker()
    self.connection_registry.message_flow.connect(tracker.track)
    try:
        yield tracker
    finally:
        self.connection_registry.message_flow.disconnect(tracker.track)

Show source on GitHub