Skip to content

AgentPool

Base classes

Name Children Inherits
BaseRegistry
llmling.core.baseregistry
Base class for registries providing item storage and change notifications.
Generic
typing
Abstract base class for generic types.

⋔ Inheritance diagram

graph TD
  94004562030144["pool.AgentPool"]
  94004551491552["baseregistry.BaseRegistry"]
  94004506342384["abc.MutableMapping"]
  94004506338416["abc.Mapping"]
  94004506332464["abc.Collection"]
  94004506330480["abc.Sized"]
  140104485245120["builtins.object"]
  94004506326512["abc.Iterable"]
  94004506331472["abc.Container"]
  94004506135904["abc.ABC"]
  94004505984624["typing.Generic"]
  94004551491552 --> 94004562030144
  94004506342384 --> 94004551491552
  94004506338416 --> 94004506342384
  94004506332464 --> 94004506338416
  94004506330480 --> 94004506332464
  140104485245120 --> 94004506330480
  94004506326512 --> 94004506332464
  140104485245120 --> 94004506326512
  94004506331472 --> 94004506332464
  140104485245120 --> 94004506331472
  94004506135904 --> 94004551491552
  140104485245120 --> 94004506135904
  94004505984624 --> 94004551491552
  140104485245120 --> 94004505984624
  94004505984624 --> 94004562030144

🛈 DocStrings

Bases: BaseRegistry[NodeName, MessageEmitter[Any, Any]]

Pool managing message processing nodes (agents and teams).

Acts as a unified registry for all nodes, providing: - Centralized node management and lookup - Shared dependency injection - Connection management - Resource coordination

Nodes can be accessed through: - nodes: All registered nodes (agents and teams) - agents: Only Agent instances - teams: Only Team instances

Source code in src/llmling_agent/delegation/pool.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
class AgentPool[TPoolDeps](BaseRegistry[NodeName, MessageEmitter[Any, Any]]):
    """Pool managing message processing nodes (agents and teams).

    Acts as a unified registry for all nodes, providing:
    - Centralized node management and lookup
    - Shared dependency injection
    - Connection management
    - Resource coordination

    Nodes can be accessed through:
    - nodes: All registered nodes (agents and teams)
    - agents: Only Agent instances
    - teams: Only Team instances
    """

    def __init__(
        self,
        manifest: StrPath | AgentsManifest | None = None,
        *,
        shared_deps: TPoolDeps | None = None,
        connect_nodes: bool = True,
        input_provider: InputProvider | None = None,
        parallel_load: bool = True,
    ):
        """Initialize agent pool with immediate agent creation.

        Args:
            manifest: Agent configuration manifest
            shared_deps: Dependencies to share across all nodes
            connect_nodes: Whether to set up forwarding connections
            input_provider: Input provider for tool / step confirmations / HumanAgents
            parallel_load: Whether to load nodes in parallel (async)

        Raises:
            ValueError: If manifest contains invalid node configurations
            RuntimeError: If node initialization fails
        """
        super().__init__()
        from llmling_agent.models.manifest import AgentsManifest
        from llmling_agent.storage import StorageManager

        match manifest:
            case None:
                self.manifest = AgentsManifest()
            case str():
                self.manifest = AgentsManifest.from_file(manifest)
            case AgentsManifest():
                self.manifest = manifest
            case _:
                msg = f"Invalid config path: {manifest}"
                raise ValueError(msg)
        self.shared_deps = shared_deps
        self._input_provider = input_provider
        self.exit_stack = AsyncExitStack()
        self.parallel_load = parallel_load
        self.storage = StorageManager(self.manifest.storage)
        self.connection_registry = ConnectionRegistry()
        self.mcp = MCPManager(
            name="pool_mcp", servers=self.manifest.get_mcp_servers(), owner="pool"
        )
        self._tasks = TaskRegistry()
        # Register tasks from manifest
        for name, task in self.manifest.jobs.items():
            self._tasks.register(name, task)
        self.pool_talk = TeamTalk[Any].from_nodes(list(self.nodes.values()))
        if self.manifest.pool_server and self.manifest.pool_server.enabled:
            from llmling_agent.resource_providers.pool import PoolResourceProvider
            from llmling_agent_mcp.server import LLMLingServer

            provider = PoolResourceProvider(
                self, zed_mode=self.manifest.pool_server.zed_mode
            )
            self.server: LLMLingServer | None = LLMLingServer(
                provider=provider,
                config=self.manifest.pool_server,
            )
        else:
            self.server = None
        # Create requested agents immediately
        for name in self.manifest.agents:
            agent = self.manifest.get_agent(name, deps=shared_deps)
            self.register(name, agent)

        # Then set up worker relationships
        for agent in self.agents.values():
            self.setup_agent_workers(agent)
        self._create_teams()
        # Set up forwarding connections
        if connect_nodes:
            self._connect_nodes()

    async def __aenter__(self) -> Self:
        """Enter async context and initialize all agents."""
        try:
            # Add MCP tool provider to all agents
            agents = list(self.agents.values())
            teams = list(self.teams.values())
            for agent in agents:
                agent.tools.add_provider(self.mcp)

            # Collect all components to initialize
            components: list[AbstractAsyncContextManager[Any]] = [
                self.mcp,
                *agents,
                *teams,
            ]

            # Add MCP server if configured
            if self.server:
                components.append(self.server)

            # Initialize all components
            if self.parallel_load:
                await asyncio.gather(
                    *(self.exit_stack.enter_async_context(c) for c in components)
                )
            else:
                for component in components:
                    await self.exit_stack.enter_async_context(component)

        except Exception as e:
            await self.cleanup()
            msg = "Failed to initialize agent pool"
            logger.exception(msg, exc_info=e)
            raise RuntimeError(msg) from e
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ):
        """Exit async context."""
        # Remove MCP tool provider from all agents
        for agent in self.agents.values():
            if self.mcp in agent.tools.providers:
                agent.tools.remove_provider(self.mcp)
        await self.cleanup()

    async def cleanup(self):
        """Clean up all agents."""
        await self.exit_stack.aclose()
        self.clear()

    @overload
    def create_team_run(
        self,
        agents: Sequence[str],
        validator: MessageNode[Any, TResult] | None = None,
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        picker: AnyAgent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
    ) -> TeamRun[TPoolDeps, TResult]: ...

    @overload
    def create_team_run[TDeps, TResult](
        self,
        agents: Sequence[MessageNode[TDeps, Any]],
        validator: MessageNode[Any, TResult] | None = None,
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        picker: AnyAgent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
    ) -> TeamRun[TDeps, TResult]: ...

    @overload
    def create_team_run(
        self,
        agents: Sequence[AgentName | MessageNode[Any, Any]],
        validator: MessageNode[Any, TResult] | None = None,
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        picker: AnyAgent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
    ) -> TeamRun[Any, TResult]: ...

    def create_team_run(
        self,
        agents: Sequence[AgentName | MessageNode[Any, Any]] | None = None,
        validator: MessageNode[Any, TResult] | None = None,
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        picker: AnyAgent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
    ) -> TeamRun[Any, TResult]:
        """Create a a sequential TeamRun from a list of Agents.

        Args:
            agents: List of agent names or team/agent instances (all if None)
            validator: Node to validate the results of the TeamRun
            name: Optional name for the team
            description: Optional description for the team
            shared_prompt: Optional prompt for all agents
            picker: Agent to use for picking agents
            num_picks: Number of agents to pick
            pick_prompt: Prompt to use for picking agents
        """
        from llmling_agent.delegation.teamrun import TeamRun

        if agents is None:
            agents = list(self.agents.keys())

        # First resolve/configure agents
        resolved_agents: list[MessageNode[Any, Any]] = []
        for agent in agents:
            if isinstance(agent, str):
                agent = self.get_agent(agent)
            resolved_agents.append(agent)
        team = TeamRun(
            resolved_agents,
            name=name,
            description=description,
            validator=validator,
            shared_prompt=shared_prompt,
            picker=picker,
            num_picks=num_picks,
            pick_prompt=pick_prompt,
        )
        if name:
            self[name] = team
        return team

    @overload
    def create_team(self, agents: Sequence[str]) -> Team[TPoolDeps]: ...

    @overload
    def create_team[TDeps](
        self,
        agents: Sequence[MessageNode[TDeps, Any]],
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        picker: AnyAgent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
    ) -> Team[TDeps]: ...

    @overload
    def create_team(
        self,
        agents: Sequence[AgentName | MessageNode[Any, Any]],
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        picker: AnyAgent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
    ) -> Team[Any]: ...

    def create_team(
        self,
        agents: Sequence[AgentName | MessageNode[Any, Any]] | None = None,
        *,
        name: str | None = None,
        description: str | None = None,
        shared_prompt: str | None = None,
        picker: AnyAgent[Any, Any] | None = None,
        num_picks: int | None = None,
        pick_prompt: str | None = None,
    ) -> Team[Any]:
        """Create a group from agent names or instances.

        Args:
            agents: List of agent names or instances (all if None)
            name: Optional name for the team
            description: Optional description for the team
            shared_prompt: Optional prompt for all agents
            picker: Agent to use for picking agents
            num_picks: Number of agents to pick
            pick_prompt: Prompt to use for picking agents
        """
        from llmling_agent.delegation.team import Team

        if agents is None:
            agents = list(self.agents.keys())

        # First resolve/configure agents
        resolved_agents: list[MessageNode[Any, Any]] = []
        for agent in agents:
            if isinstance(agent, str):
                agent = self.get_agent(agent)
            resolved_agents.append(agent)

        team = Team(
            name=name,
            description=description,
            agents=resolved_agents,
            shared_prompt=shared_prompt,
            picker=picker,
            num_picks=num_picks,
            pick_prompt=pick_prompt,
        )
        if name:
            self[name] = team
        return team

    @asynccontextmanager
    async def track_message_flow(self) -> AsyncIterator[MessageFlowTracker]:
        """Track message flow during a context."""
        tracker = MessageFlowTracker()
        self.connection_registry.message_flow.connect(tracker.track)
        try:
            yield tracker
        finally:
            self.connection_registry.message_flow.disconnect(tracker.track)

    async def run_event_loop(self):
        """Run pool in event-watching mode until interrupted."""
        import sys

        print("Starting event watch mode...")
        print("Active nodes: ", ", ".join(self.list_nodes()))
        print("Press Ctrl+C to stop")

        stop_event = asyncio.Event()

        if sys.platform != "win32":
            # Unix: Use signal handlers
            loop = asyncio.get_running_loop()
            for sig in (signal.SIGINT, signal.SIGTERM):
                loop.add_signal_handler(sig, stop_event.set)
            while True:
                await asyncio.sleep(1)
        else:
            # Windows: Use keyboard interrupt
            with suppress(KeyboardInterrupt):
                while True:
                    await asyncio.sleep(1)

    @property
    def agents(self) -> dict[str, AnyAgent[Any, Any]]:
        """Get agents dict (backward compatibility)."""
        return {
            i.name: i
            for i in self._items.values()
            if isinstance(i, Agent | StructuredAgent)
        }

    @property
    def teams(self) -> dict[str, BaseTeam[Any, Any]]:
        """Get agents dict (backward compatibility)."""
        from llmling_agent.delegation.base_team import BaseTeam

        return {i.name: i for i in self._items.values() if isinstance(i, BaseTeam)}

    @property
    def nodes(self) -> dict[str, MessageNode[Any, Any]]:
        """Get agents dict (backward compatibility)."""
        from llmling_agent.messaging.messagenode import MessageNode

        return {i.name: i for i in self._items.values() if isinstance(i, MessageNode)}

    @property
    def event_nodes(self) -> dict[str, EventNode[Any]]:
        """Get agents dict (backward compatibility)."""
        from llmling_agent.messaging.eventnode import EventNode

        return {i.name: i for i in self._items.values() if isinstance(i, EventNode)}

    @property
    def node_events(self) -> DictEvents:
        """Get node events."""
        return self._items.events

    @property
    def _error_class(self) -> type[LLMLingError]:
        """Error class for agent operations."""
        return LLMLingError

    def _validate_item(
        self, item: MessageEmitter[Any, Any] | Any
    ) -> MessageEmitter[Any, Any]:
        """Validate and convert items before registration.

        Args:
            item: Item to validate

        Returns:
            Validated Node

        Raises:
            LLMlingError: If item is not a valid node
        """
        if not isinstance(item, MessageEmitter):
            msg = f"Item must be Agent or Team, got {type(item)}"
            raise self._error_class(msg)
        item.context.pool = self
        return item

    def _create_teams(self):
        """Create all teams in two phases to allow nesting."""
        # Phase 1: Create empty teams

        empty_teams: dict[str, BaseTeam[Any, Any]] = {}
        for name, config in self.manifest.teams.items():
            if config.mode == "parallel":
                empty_teams[name] = Team(
                    [], name=name, shared_prompt=config.shared_prompt
                )
            else:
                empty_teams[name] = TeamRun(
                    [], name=name, shared_prompt=config.shared_prompt
                )

        # Phase 2: Resolve members
        for name, config in self.manifest.teams.items():
            team = empty_teams[name]
            members: list[MessageNode[Any, Any]] = []
            for member in config.members:
                if member in self.agents:
                    members.append(self.agents[member])
                elif member in empty_teams:
                    members.append(empty_teams[member])
                else:
                    msg = f"Unknown team member: {member}"
                    raise ValueError(msg)
            team.agents.extend(members)
            self[name] = team

    def _connect_nodes(self):
        """Set up connections defined in manifest."""
        # Merge agent and team configs into one dict of nodes with connections
        for name, config in self.manifest.nodes.items():
            source = self[name]
            for target in config.connections or []:
                match target:
                    case NodeConnectionConfig():
                        if target.name not in self:
                            msg = f"Forward target {target.name} not found for {name}"
                            raise ValueError(msg)
                        target_node = self[target.name]
                    case FileConnectionConfig() | CallableConnectionConfig():
                        target_node = Agent(provider=target.get_provider())
                    case _:
                        msg = f"Invalid connection config: {target}"
                        raise ValueError(msg)

                source.connect_to(
                    target_node,  # type: ignore  # recognized as "Any | BaseTeam[Any, Any]" by mypy?
                    connection_type=target.connection_type,
                    name=name,
                    priority=target.priority,
                    delay=target.delay,
                    queued=target.queued,
                    queue_strategy=target.queue_strategy,
                    transform=target.transform,
                    filter_condition=target.filter_condition.check
                    if target.filter_condition
                    else None,
                    stop_condition=target.stop_condition.check
                    if target.stop_condition
                    else None,
                    exit_condition=target.exit_condition.check
                    if target.exit_condition
                    else None,
                )
                source.connections.set_wait_state(
                    target_node,
                    wait=target.wait_for_completion,
                )

    @overload
    async def clone_agent[TDeps](
        self,
        agent: AgentName | Agent[TDeps],
        new_name: AgentName | None = None,
        *,
        system_prompts: list[str] | None = None,
        template_context: dict[str, Any] | None = None,
    ) -> Agent[TDeps]: ...

    @overload
    async def clone_agent[TDeps, TResult](
        self,
        agent: StructuredAgent[TDeps, TResult],
        new_name: AgentName | None = None,
        *,
        system_prompts: list[str] | None = None,
        template_context: dict[str, Any] | None = None,
    ) -> StructuredAgent[TDeps, TResult]: ...

    async def clone_agent[TDeps, TAgentResult](
        self,
        agent: AgentName | AnyAgent[TDeps, TAgentResult],
        new_name: AgentName | None = None,
        *,
        system_prompts: list[str] | None = None,
        template_context: dict[str, Any] | None = None,
    ) -> AnyAgent[TDeps, TAgentResult]:
        """Create a copy of an agent.

        Args:
            agent: Agent instance or name to clone
            new_name: Optional name for the clone
            system_prompts: Optional different prompts
            template_context: Variables for template rendering

        Returns:
            The new agent instance
        """
        from llmling_agent.agent import Agent, StructuredAgent

        # Get original config
        if isinstance(agent, str):
            if agent not in self.manifest.agents:
                msg = f"Agent {agent} not found"
                raise KeyError(msg)
            config = self.manifest.agents[agent]
            original_agent: AnyAgent[Any, Any] = self.get_agent(agent)
        else:
            config = agent.context.config  # type: ignore
            original_agent = agent

        # Create new config
        new_config = config.model_copy(deep=True)

        # Apply overrides
        if system_prompts:
            new_config.system_prompts = system_prompts

        # Handle template rendering
        if template_context:
            new_config.system_prompts = new_config.render_system_prompts(template_context)

        # Create new agent with same runtime
        new_agent = Agent[TDeps](
            runtime=original_agent.runtime,
            context=original_agent.context,
            # result_type=original_agent.actual_type,
            provider=new_config.get_provider(),
            system_prompt=new_config.system_prompts,
            name=new_name or f"{config.name}_copy_{len(self.agents)}",
        )
        if isinstance(original_agent, StructuredAgent):
            new_agent = new_agent.to_structured(original_agent.actual_type)

        # Register in pool
        agent_name = new_agent.name
        self.manifest.agents[agent_name] = new_config
        self.agents[agent_name] = new_agent
        return await self.exit_stack.enter_async_context(new_agent)

    @overload
    async def create_agent(
        self,
        name: AgentName,
        *,
        session: SessionIdType | SessionQuery = None,
        name_override: str | None = None,
    ) -> Agent[TPoolDeps]: ...

    @overload
    async def create_agent[TCustomDeps](
        self,
        name: AgentName,
        *,
        deps: TCustomDeps,
        session: SessionIdType | SessionQuery = None,
        name_override: str | None = None,
    ) -> Agent[TCustomDeps]: ...

    @overload
    async def create_agent[TResult](
        self,
        name: AgentName,
        *,
        return_type: type[TResult],
        session: SessionIdType | SessionQuery = None,
        name_override: str | None = None,
    ) -> StructuredAgent[TPoolDeps, TResult]: ...

    @overload
    async def create_agent[TCustomDeps, TResult](
        self,
        name: AgentName,
        *,
        deps: TCustomDeps,
        return_type: type[TResult],
        session: SessionIdType | SessionQuery = None,
        name_override: str | None = None,
    ) -> StructuredAgent[TCustomDeps, TResult]: ...

    async def create_agent(
        self,
        name: AgentName,
        *,
        deps: Any | None = None,
        return_type: Any | None = None,
        session: SessionIdType | SessionQuery = None,
        name_override: str | None = None,
    ) -> AnyAgent[Any, Any]:
        """Create a new agent instance from configuration.

        Args:
            name: Name of the agent configuration to use
            deps: Optional custom dependencies (overrides pool deps)
            return_type: Optional type for structured responses
            session: Optional session ID or query to recover conversation
            name_override: Optional different name for this instance

        Returns:
            New agent instance with the specified configuration

        Raises:
            KeyError: If agent configuration not found
            ValueError: If configuration is invalid
        """
        if name not in self.manifest.agents:
            msg = f"Agent configuration {name!r} not found"
            raise KeyError(msg)

        # Use Manifest.get_agent for proper initialization
        final_deps = deps if deps is not None else self.shared_deps
        agent = self.manifest.get_agent(name, deps=final_deps)
        # Override name if requested
        if name_override:
            agent.name = name_override

        # Set pool reference
        agent.context.pool = self

        # Handle session if provided
        if session:
            agent.conversation.load_history_from_database(session=session)

        # Initialize agent through exit stack
        agent = await self.exit_stack.enter_async_context(agent)

        # Override structured configuration if provided
        if return_type is not None:
            return agent.to_structured(return_type)

        return agent

    def setup_agent_workers(self, agent: AnyAgent[Any, Any]):
        """Set up workers for an agent from configuration."""
        for worker_config in agent.context.config.workers:
            try:
                worker = self.get_agent(worker_config.name)
                agent.register_worker(
                    worker,
                    name=worker_config.name,
                    reset_history_on_run=worker_config.reset_history_on_run,
                    pass_message_history=worker_config.pass_message_history,
                    share_context=worker_config.share_context,
                )
            except KeyError as e:
                msg = f"Worker agent {worker_config.name!r} not found"
                raise ValueError(msg) from e

    @overload
    def get_agent(
        self,
        agent: AgentName | Agent[Any],
        *,
        model_override: str | None = None,
        session: SessionIdType | SessionQuery = None,
    ) -> Agent[TPoolDeps]: ...

    @overload
    def get_agent[TResult](
        self,
        agent: AgentName | Agent[Any],
        *,
        return_type: type[TResult],
        model_override: str | None = None,
        session: SessionIdType | SessionQuery = None,
    ) -> StructuredAgent[TPoolDeps, TResult]: ...

    @overload
    def get_agent[TCustomDeps](
        self,
        agent: AgentName | Agent[Any],
        *,
        deps: TCustomDeps,
        model_override: str | None = None,
        session: SessionIdType | SessionQuery = None,
    ) -> Agent[TCustomDeps]: ...

    @overload
    def get_agent[TCustomDeps, TResult](
        self,
        agent: AgentName | Agent[Any],
        *,
        deps: TCustomDeps,
        return_type: type[TResult],
        model_override: str | None = None,
        session: SessionIdType | SessionQuery = None,
    ) -> StructuredAgent[TCustomDeps, TResult]: ...

    def get_agent(
        self,
        agent: AgentName | Agent[Any],
        *,
        deps: Any | None = None,
        return_type: Any | None = None,
        model_override: str | None = None,
        session: SessionIdType | SessionQuery = None,
    ) -> AnyAgent[Any, Any]:
        """Get or configure an agent from the pool.

        This method provides flexible agent configuration with dependency injection:
        - Without deps: Agent uses pool's shared dependencies
        - With deps: Agent uses provided custom dependencies
        - With return_type: Returns a StructuredAgent with type validation

        Args:
            agent: Either agent name or instance
            deps: Optional custom dependencies (overrides shared deps)
            return_type: Optional type for structured responses
            model_override: Optional model override
            session: Optional session ID or query to recover conversation

        Returns:
            Either:
            - Agent[TPoolDeps] when using pool's shared deps
            - Agent[TCustomDeps] when custom deps provided
            - StructuredAgent when return_type provided

        Raises:
            KeyError: If agent name not found
            ValueError: If configuration is invalid
        """
        from llmling_agent.agent import Agent
        from llmling_agent.agent.context import AgentContext

        # Get base agent
        base = agent if isinstance(agent, Agent) else self.agents[agent]

        # Setup context and dependencies
        if base.context is None:
            base.context = AgentContext[Any].create_default(base.name)

        # Use custom deps if provided, otherwise use shared deps
        base.context.data = deps if deps is not None else self.shared_deps
        base.context.pool = self

        # Apply overrides
        if model_override:
            base.set_model(model_override)

        if session:
            base.conversation.load_history_from_database(session=session)

        # Convert to structured if needed
        if return_type is not None:
            return base.to_structured(return_type)

        return base

    def list_nodes(self) -> list[str]:
        """List available agent names."""
        return list(self.list_items())

    def get_job(self, name: str) -> Job[Any, Any]:
        return self._tasks[name]

    def register_task(self, name: str, task: Job[Any, Any]):
        self._tasks.register(name, task)

    @overload
    async def add_agent(
        self,
        name: AgentName,
        *,
        result_type: None = None,
        **kwargs: Unpack[AgentKwargs],
    ) -> Agent[Any]: ...

    @overload
    async def add_agent[TResult](
        self,
        name: AgentName,
        *,
        result_type: type[TResult] | str | ResponseDefinition,
        **kwargs: Unpack[AgentKwargs],
    ) -> StructuredAgent[Any, TResult]: ...

    async def add_agent(
        self,
        name: AgentName,
        *,
        result_type: type[Any] | str | ResponseDefinition | None = None,
        **kwargs: Unpack[AgentKwargs],
    ) -> Agent[Any] | StructuredAgent[Any, Any]:
        """Add a new permanent agent to the pool.

        Args:
            name: Name for the new agent
            result_type: Optional type for structured responses:
                - None: Regular unstructured agent
                - type: Python type for validation
                - str: Name of response definition
                - ResponseDefinition: Complete response definition
            **kwargs: Additional agent configuration

        Returns:
            Either a regular Agent or StructuredAgent depending on result_type
        """
        from llmling_agent.agent import Agent

        agent: AnyAgent[Any, Any] = Agent(name=name, **kwargs)
        agent.tools.add_provider(self.mcp)
        agent = await self.exit_stack.enter_async_context(agent)
        # Convert to structured if needed
        if result_type is not None:
            agent = agent.to_structured(result_type)
        self.register(name, agent)
        return agent

    def get_mermaid_diagram(
        self,
        include_details: bool = True,
    ) -> str:
        """Generate mermaid flowchart of all agents and their connections.

        Args:
            include_details: Whether to show connection details (types, queues, etc)
        """
        lines = ["flowchart LR"]

        # Add all agents as nodes
        for name in self.agents:
            lines.append(f"    {name}[{name}]")  # noqa: PERF401

        # Add all connections as edges
        for agent in self.agents.values():
            connections = agent.connections.get_connections()
            for talk in connections:
                talk = cast(Talk[Any], talk)  # help mypy understand it's a Talk
                source = talk.source.name
                for target in talk.targets:
                    if include_details:
                        details: list[str] = []
                        details.append(talk.connection_type)
                        if talk.queued:
                            details.append(f"queued({talk.queue_strategy})")
                        if fn := talk.filter_condition:  # type: ignore
                            details.append(f"filter:{fn.__name__}")
                        if fn := talk.stop_condition:  # type: ignore
                            details.append(f"stop:{fn.__name__}")
                        if fn := talk.exit_condition:  # type: ignore
                            details.append(f"exit:{fn.__name__}")

                        label = f"|{' '.join(details)}|" if details else ""
                        lines.append(f"    {source}--{label}-->{target.name}")
                    else:
                        lines.append(f"    {source}-->{target.name}")

        return "\n".join(lines)

_error_class property

_error_class: type[LLMLingError]

Error class for agent operations.

agents property

agents: dict[str, AnyAgent[Any, Any]]

Get agents dict (backward compatibility).

event_nodes property

event_nodes: dict[str, EventNode[Any]]

Get agents dict (backward compatibility).

node_events property

node_events: DictEvents

Get node events.

nodes property

nodes: dict[str, MessageNode[Any, Any]]

Get agents dict (backward compatibility).

teams property

teams: dict[str, BaseTeam[Any, Any]]

Get agents dict (backward compatibility).

__aenter__ async

__aenter__() -> Self

Enter async context and initialize all agents.

Source code in src/llmling_agent/delegation/pool.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
async def __aenter__(self) -> Self:
    """Enter async context and initialize all agents."""
    try:
        # Add MCP tool provider to all agents
        agents = list(self.agents.values())
        teams = list(self.teams.values())
        for agent in agents:
            agent.tools.add_provider(self.mcp)

        # Collect all components to initialize
        components: list[AbstractAsyncContextManager[Any]] = [
            self.mcp,
            *agents,
            *teams,
        ]

        # Add MCP server if configured
        if self.server:
            components.append(self.server)

        # Initialize all components
        if self.parallel_load:
            await asyncio.gather(
                *(self.exit_stack.enter_async_context(c) for c in components)
            )
        else:
            for component in components:
                await self.exit_stack.enter_async_context(component)

    except Exception as e:
        await self.cleanup()
        msg = "Failed to initialize agent pool"
        logger.exception(msg, exc_info=e)
        raise RuntimeError(msg) from e
    return self

__aexit__ async

__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
)

Exit async context.

Source code in src/llmling_agent/delegation/pool.py
188
189
190
191
192
193
194
195
196
197
198
199
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
):
    """Exit async context."""
    # Remove MCP tool provider from all agents
    for agent in self.agents.values():
        if self.mcp in agent.tools.providers:
            agent.tools.remove_provider(self.mcp)
    await self.cleanup()

__init__

__init__(
    manifest: StrPath | AgentsManifest | None = None,
    *,
    shared_deps: TPoolDeps | None = None,
    connect_nodes: bool = True,
    input_provider: InputProvider | None = None,
    parallel_load: bool = True,
)

Initialize agent pool with immediate agent creation.

Parameters:

Name Type Description Default
manifest StrPath | AgentsManifest | None

Agent configuration manifest

None
shared_deps TPoolDeps | None

Dependencies to share across all nodes

None
connect_nodes bool

Whether to set up forwarding connections

True
input_provider InputProvider | None

Input provider for tool / step confirmations / HumanAgents

None
parallel_load bool

Whether to load nodes in parallel (async)

True

Raises:

Type Description
ValueError

If manifest contains invalid node configurations

RuntimeError

If node initialization fails

Source code in src/llmling_agent/delegation/pool.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def __init__(
    self,
    manifest: StrPath | AgentsManifest | None = None,
    *,
    shared_deps: TPoolDeps | None = None,
    connect_nodes: bool = True,
    input_provider: InputProvider | None = None,
    parallel_load: bool = True,
):
    """Initialize agent pool with immediate agent creation.

    Args:
        manifest: Agent configuration manifest
        shared_deps: Dependencies to share across all nodes
        connect_nodes: Whether to set up forwarding connections
        input_provider: Input provider for tool / step confirmations / HumanAgents
        parallel_load: Whether to load nodes in parallel (async)

    Raises:
        ValueError: If manifest contains invalid node configurations
        RuntimeError: If node initialization fails
    """
    super().__init__()
    from llmling_agent.models.manifest import AgentsManifest
    from llmling_agent.storage import StorageManager

    match manifest:
        case None:
            self.manifest = AgentsManifest()
        case str():
            self.manifest = AgentsManifest.from_file(manifest)
        case AgentsManifest():
            self.manifest = manifest
        case _:
            msg = f"Invalid config path: {manifest}"
            raise ValueError(msg)
    self.shared_deps = shared_deps
    self._input_provider = input_provider
    self.exit_stack = AsyncExitStack()
    self.parallel_load = parallel_load
    self.storage = StorageManager(self.manifest.storage)
    self.connection_registry = ConnectionRegistry()
    self.mcp = MCPManager(
        name="pool_mcp", servers=self.manifest.get_mcp_servers(), owner="pool"
    )
    self._tasks = TaskRegistry()
    # Register tasks from manifest
    for name, task in self.manifest.jobs.items():
        self._tasks.register(name, task)
    self.pool_talk = TeamTalk[Any].from_nodes(list(self.nodes.values()))
    if self.manifest.pool_server and self.manifest.pool_server.enabled:
        from llmling_agent.resource_providers.pool import PoolResourceProvider
        from llmling_agent_mcp.server import LLMLingServer

        provider = PoolResourceProvider(
            self, zed_mode=self.manifest.pool_server.zed_mode
        )
        self.server: LLMLingServer | None = LLMLingServer(
            provider=provider,
            config=self.manifest.pool_server,
        )
    else:
        self.server = None
    # Create requested agents immediately
    for name in self.manifest.agents:
        agent = self.manifest.get_agent(name, deps=shared_deps)
        self.register(name, agent)

    # Then set up worker relationships
    for agent in self.agents.values():
        self.setup_agent_workers(agent)
    self._create_teams()
    # Set up forwarding connections
    if connect_nodes:
        self._connect_nodes()

_connect_nodes

_connect_nodes()

Set up connections defined in manifest.

Source code in src/llmling_agent/delegation/pool.py
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
def _connect_nodes(self):
    """Set up connections defined in manifest."""
    # Merge agent and team configs into one dict of nodes with connections
    for name, config in self.manifest.nodes.items():
        source = self[name]
        for target in config.connections or []:
            match target:
                case NodeConnectionConfig():
                    if target.name not in self:
                        msg = f"Forward target {target.name} not found for {name}"
                        raise ValueError(msg)
                    target_node = self[target.name]
                case FileConnectionConfig() | CallableConnectionConfig():
                    target_node = Agent(provider=target.get_provider())
                case _:
                    msg = f"Invalid connection config: {target}"
                    raise ValueError(msg)

            source.connect_to(
                target_node,  # type: ignore  # recognized as "Any | BaseTeam[Any, Any]" by mypy?
                connection_type=target.connection_type,
                name=name,
                priority=target.priority,
                delay=target.delay,
                queued=target.queued,
                queue_strategy=target.queue_strategy,
                transform=target.transform,
                filter_condition=target.filter_condition.check
                if target.filter_condition
                else None,
                stop_condition=target.stop_condition.check
                if target.stop_condition
                else None,
                exit_condition=target.exit_condition.check
                if target.exit_condition
                else None,
            )
            source.connections.set_wait_state(
                target_node,
                wait=target.wait_for_completion,
            )

_create_teams

_create_teams()

Create all teams in two phases to allow nesting.

Source code in src/llmling_agent/delegation/pool.py
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
def _create_teams(self):
    """Create all teams in two phases to allow nesting."""
    # Phase 1: Create empty teams

    empty_teams: dict[str, BaseTeam[Any, Any]] = {}
    for name, config in self.manifest.teams.items():
        if config.mode == "parallel":
            empty_teams[name] = Team(
                [], name=name, shared_prompt=config.shared_prompt
            )
        else:
            empty_teams[name] = TeamRun(
                [], name=name, shared_prompt=config.shared_prompt
            )

    # Phase 2: Resolve members
    for name, config in self.manifest.teams.items():
        team = empty_teams[name]
        members: list[MessageNode[Any, Any]] = []
        for member in config.members:
            if member in self.agents:
                members.append(self.agents[member])
            elif member in empty_teams:
                members.append(empty_teams[member])
            else:
                msg = f"Unknown team member: {member}"
                raise ValueError(msg)
        team.agents.extend(members)
        self[name] = team

_validate_item

_validate_item(item: MessageEmitter[Any, Any] | Any) -> MessageEmitter[Any, Any]

Validate and convert items before registration.

Parameters:

Name Type Description Default
item MessageEmitter[Any, Any] | Any

Item to validate

required

Returns:

Type Description
MessageEmitter[Any, Any]

Validated Node

Raises:

Type Description
LLMlingError

If item is not a valid node

Source code in src/llmling_agent/delegation/pool.py
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
def _validate_item(
    self, item: MessageEmitter[Any, Any] | Any
) -> MessageEmitter[Any, Any]:
    """Validate and convert items before registration.

    Args:
        item: Item to validate

    Returns:
        Validated Node

    Raises:
        LLMlingError: If item is not a valid node
    """
    if not isinstance(item, MessageEmitter):
        msg = f"Item must be Agent or Team, got {type(item)}"
        raise self._error_class(msg)
    item.context.pool = self
    return item

add_agent async

add_agent(
    name: AgentName, *, result_type: None = None, **kwargs: Unpack[AgentKwargs]
) -> Agent[Any]
add_agent(
    name: AgentName,
    *,
    result_type: type[TResult] | str | ResponseDefinition,
    **kwargs: Unpack[AgentKwargs],
) -> StructuredAgent[Any, TResult]
add_agent(
    name: AgentName,
    *,
    result_type: type[Any] | str | ResponseDefinition | None = None,
    **kwargs: Unpack[AgentKwargs],
) -> Agent[Any] | StructuredAgent[Any, Any]

Add a new permanent agent to the pool.

Parameters:

Name Type Description Default
name AgentName

Name for the new agent

required
result_type type[Any] | str | ResponseDefinition | None

Optional type for structured responses: - None: Regular unstructured agent - type: Python type for validation - str: Name of response definition - ResponseDefinition: Complete response definition

None
**kwargs Unpack[AgentKwargs]

Additional agent configuration

{}

Returns:

Type Description
Agent[Any] | StructuredAgent[Any, Any]

Either a regular Agent or StructuredAgent depending on result_type

Source code in src/llmling_agent/delegation/pool.py
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
async def add_agent(
    self,
    name: AgentName,
    *,
    result_type: type[Any] | str | ResponseDefinition | None = None,
    **kwargs: Unpack[AgentKwargs],
) -> Agent[Any] | StructuredAgent[Any, Any]:
    """Add a new permanent agent to the pool.

    Args:
        name: Name for the new agent
        result_type: Optional type for structured responses:
            - None: Regular unstructured agent
            - type: Python type for validation
            - str: Name of response definition
            - ResponseDefinition: Complete response definition
        **kwargs: Additional agent configuration

    Returns:
        Either a regular Agent or StructuredAgent depending on result_type
    """
    from llmling_agent.agent import Agent

    agent: AnyAgent[Any, Any] = Agent(name=name, **kwargs)
    agent.tools.add_provider(self.mcp)
    agent = await self.exit_stack.enter_async_context(agent)
    # Convert to structured if needed
    if result_type is not None:
        agent = agent.to_structured(result_type)
    self.register(name, agent)
    return agent

cleanup async

cleanup()

Clean up all agents.

Source code in src/llmling_agent/delegation/pool.py
201
202
203
204
async def cleanup(self):
    """Clean up all agents."""
    await self.exit_stack.aclose()
    self.clear()

clone_agent async

clone_agent(
    agent: AgentName | Agent[TDeps],
    new_name: AgentName | None = None,
    *,
    system_prompts: list[str] | None = None,
    template_context: dict[str, Any] | None = None,
) -> Agent[TDeps]
clone_agent(
    agent: StructuredAgent[TDeps, TResult],
    new_name: AgentName | None = None,
    *,
    system_prompts: list[str] | None = None,
    template_context: dict[str, Any] | None = None,
) -> StructuredAgent[TDeps, TResult]
clone_agent(
    agent: AgentName | AnyAgent[TDeps, TAgentResult],
    new_name: AgentName | None = None,
    *,
    system_prompts: list[str] | None = None,
    template_context: dict[str, Any] | None = None,
) -> AnyAgent[TDeps, TAgentResult]

Create a copy of an agent.

Parameters:

Name Type Description Default
agent AgentName | AnyAgent[TDeps, TAgentResult]

Agent instance or name to clone

required
new_name AgentName | None

Optional name for the clone

None
system_prompts list[str] | None

Optional different prompts

None
template_context dict[str, Any] | None

Variables for template rendering

None

Returns:

Type Description
AnyAgent[TDeps, TAgentResult]

The new agent instance

Source code in src/llmling_agent/delegation/pool.py
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
async def clone_agent[TDeps, TAgentResult](
    self,
    agent: AgentName | AnyAgent[TDeps, TAgentResult],
    new_name: AgentName | None = None,
    *,
    system_prompts: list[str] | None = None,
    template_context: dict[str, Any] | None = None,
) -> AnyAgent[TDeps, TAgentResult]:
    """Create a copy of an agent.

    Args:
        agent: Agent instance or name to clone
        new_name: Optional name for the clone
        system_prompts: Optional different prompts
        template_context: Variables for template rendering

    Returns:
        The new agent instance
    """
    from llmling_agent.agent import Agent, StructuredAgent

    # Get original config
    if isinstance(agent, str):
        if agent not in self.manifest.agents:
            msg = f"Agent {agent} not found"
            raise KeyError(msg)
        config = self.manifest.agents[agent]
        original_agent: AnyAgent[Any, Any] = self.get_agent(agent)
    else:
        config = agent.context.config  # type: ignore
        original_agent = agent

    # Create new config
    new_config = config.model_copy(deep=True)

    # Apply overrides
    if system_prompts:
        new_config.system_prompts = system_prompts

    # Handle template rendering
    if template_context:
        new_config.system_prompts = new_config.render_system_prompts(template_context)

    # Create new agent with same runtime
    new_agent = Agent[TDeps](
        runtime=original_agent.runtime,
        context=original_agent.context,
        # result_type=original_agent.actual_type,
        provider=new_config.get_provider(),
        system_prompt=new_config.system_prompts,
        name=new_name or f"{config.name}_copy_{len(self.agents)}",
    )
    if isinstance(original_agent, StructuredAgent):
        new_agent = new_agent.to_structured(original_agent.actual_type)

    # Register in pool
    agent_name = new_agent.name
    self.manifest.agents[agent_name] = new_config
    self.agents[agent_name] = new_agent
    return await self.exit_stack.enter_async_context(new_agent)

create_agent async

create_agent(
    name: AgentName,
    *,
    session: SessionIdType | SessionQuery = None,
    name_override: str | None = None,
) -> Agent[TPoolDeps]
create_agent(
    name: AgentName,
    *,
    deps: TCustomDeps,
    session: SessionIdType | SessionQuery = None,
    name_override: str | None = None,
) -> Agent[TCustomDeps]
create_agent(
    name: AgentName,
    *,
    return_type: type[TResult],
    session: SessionIdType | SessionQuery = None,
    name_override: str | None = None,
) -> StructuredAgent[TPoolDeps, TResult]
create_agent(
    name: AgentName,
    *,
    deps: TCustomDeps,
    return_type: type[TResult],
    session: SessionIdType | SessionQuery = None,
    name_override: str | None = None,
) -> StructuredAgent[TCustomDeps, TResult]
create_agent(
    name: AgentName,
    *,
    deps: Any | None = None,
    return_type: Any | None = None,
    session: SessionIdType | SessionQuery = None,
    name_override: str | None = None,
) -> AnyAgent[Any, Any]

Create a new agent instance from configuration.

Parameters:

Name Type Description Default
name AgentName

Name of the agent configuration to use

required
deps Any | None

Optional custom dependencies (overrides pool deps)

None
return_type Any | None

Optional type for structured responses

None
session SessionIdType | SessionQuery

Optional session ID or query to recover conversation

None
name_override str | None

Optional different name for this instance

None

Returns:

Type Description
AnyAgent[Any, Any]

New agent instance with the specified configuration

Raises:

Type Description
KeyError

If agent configuration not found

ValueError

If configuration is invalid

Source code in src/llmling_agent/delegation/pool.py
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
async def create_agent(
    self,
    name: AgentName,
    *,
    deps: Any | None = None,
    return_type: Any | None = None,
    session: SessionIdType | SessionQuery = None,
    name_override: str | None = None,
) -> AnyAgent[Any, Any]:
    """Create a new agent instance from configuration.

    Args:
        name: Name of the agent configuration to use
        deps: Optional custom dependencies (overrides pool deps)
        return_type: Optional type for structured responses
        session: Optional session ID or query to recover conversation
        name_override: Optional different name for this instance

    Returns:
        New agent instance with the specified configuration

    Raises:
        KeyError: If agent configuration not found
        ValueError: If configuration is invalid
    """
    if name not in self.manifest.agents:
        msg = f"Agent configuration {name!r} not found"
        raise KeyError(msg)

    # Use Manifest.get_agent for proper initialization
    final_deps = deps if deps is not None else self.shared_deps
    agent = self.manifest.get_agent(name, deps=final_deps)
    # Override name if requested
    if name_override:
        agent.name = name_override

    # Set pool reference
    agent.context.pool = self

    # Handle session if provided
    if session:
        agent.conversation.load_history_from_database(session=session)

    # Initialize agent through exit stack
    agent = await self.exit_stack.enter_async_context(agent)

    # Override structured configuration if provided
    if return_type is not None:
        return agent.to_structured(return_type)

    return agent

create_team

create_team(agents: Sequence[str]) -> Team[TPoolDeps]
create_team(
    agents: Sequence[MessageNode[TDeps, Any]],
    *,
    name: str | None = None,
    description: str | None = None,
    shared_prompt: str | None = None,
    picker: AnyAgent[Any, Any] | None = None,
    num_picks: int | None = None,
    pick_prompt: str | None = None,
) -> Team[TDeps]
create_team(
    agents: Sequence[AgentName | MessageNode[Any, Any]],
    *,
    name: str | None = None,
    description: str | None = None,
    shared_prompt: str | None = None,
    picker: AnyAgent[Any, Any] | None = None,
    num_picks: int | None = None,
    pick_prompt: str | None = None,
) -> Team[Any]
create_team(
    agents: Sequence[AgentName | MessageNode[Any, Any]] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    shared_prompt: str | None = None,
    picker: AnyAgent[Any, Any] | None = None,
    num_picks: int | None = None,
    pick_prompt: str | None = None,
) -> Team[Any]

Create a group from agent names or instances.

Parameters:

Name Type Description Default
agents Sequence[AgentName | MessageNode[Any, Any]] | None

List of agent names or instances (all if None)

None
name str | None

Optional name for the team

None
description str | None

Optional description for the team

None
shared_prompt str | None

Optional prompt for all agents

None
picker AnyAgent[Any, Any] | None

Agent to use for picking agents

None
num_picks int | None

Number of agents to pick

None
pick_prompt str | None

Prompt to use for picking agents

None
Source code in src/llmling_agent/delegation/pool.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def create_team(
    self,
    agents: Sequence[AgentName | MessageNode[Any, Any]] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    shared_prompt: str | None = None,
    picker: AnyAgent[Any, Any] | None = None,
    num_picks: int | None = None,
    pick_prompt: str | None = None,
) -> Team[Any]:
    """Create a group from agent names or instances.

    Args:
        agents: List of agent names or instances (all if None)
        name: Optional name for the team
        description: Optional description for the team
        shared_prompt: Optional prompt for all agents
        picker: Agent to use for picking agents
        num_picks: Number of agents to pick
        pick_prompt: Prompt to use for picking agents
    """
    from llmling_agent.delegation.team import Team

    if agents is None:
        agents = list(self.agents.keys())

    # First resolve/configure agents
    resolved_agents: list[MessageNode[Any, Any]] = []
    for agent in agents:
        if isinstance(agent, str):
            agent = self.get_agent(agent)
        resolved_agents.append(agent)

    team = Team(
        name=name,
        description=description,
        agents=resolved_agents,
        shared_prompt=shared_prompt,
        picker=picker,
        num_picks=num_picks,
        pick_prompt=pick_prompt,
    )
    if name:
        self[name] = team
    return team

create_team_run

create_team_run(
    agents: Sequence[str],
    validator: MessageNode[Any, TResult] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    shared_prompt: str | None = None,
    picker: AnyAgent[Any, Any] | None = None,
    num_picks: int | None = None,
    pick_prompt: str | None = None,
) -> TeamRun[TPoolDeps, TResult]
create_team_run(
    agents: Sequence[MessageNode[TDeps, Any]],
    validator: MessageNode[Any, TResult] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    shared_prompt: str | None = None,
    picker: AnyAgent[Any, Any] | None = None,
    num_picks: int | None = None,
    pick_prompt: str | None = None,
) -> TeamRun[TDeps, TResult]
create_team_run(
    agents: Sequence[AgentName | MessageNode[Any, Any]],
    validator: MessageNode[Any, TResult] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    shared_prompt: str | None = None,
    picker: AnyAgent[Any, Any] | None = None,
    num_picks: int | None = None,
    pick_prompt: str | None = None,
) -> TeamRun[Any, TResult]
create_team_run(
    agents: Sequence[AgentName | MessageNode[Any, Any]] | None = None,
    validator: MessageNode[Any, TResult] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    shared_prompt: str | None = None,
    picker: AnyAgent[Any, Any] | None = None,
    num_picks: int | None = None,
    pick_prompt: str | None = None,
) -> TeamRun[Any, TResult]

Create a a sequential TeamRun from a list of Agents.

Parameters:

Name Type Description Default
agents Sequence[AgentName | MessageNode[Any, Any]] | None

List of agent names or team/agent instances (all if None)

None
validator MessageNode[Any, TResult] | None

Node to validate the results of the TeamRun

None
name str | None

Optional name for the team

None
description str | None

Optional description for the team

None
shared_prompt str | None

Optional prompt for all agents

None
picker AnyAgent[Any, Any] | None

Agent to use for picking agents

None
num_picks int | None

Number of agents to pick

None
pick_prompt str | None

Prompt to use for picking agents

None
Source code in src/llmling_agent/delegation/pool.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def create_team_run(
    self,
    agents: Sequence[AgentName | MessageNode[Any, Any]] | None = None,
    validator: MessageNode[Any, TResult] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    shared_prompt: str | None = None,
    picker: AnyAgent[Any, Any] | None = None,
    num_picks: int | None = None,
    pick_prompt: str | None = None,
) -> TeamRun[Any, TResult]:
    """Create a a sequential TeamRun from a list of Agents.

    Args:
        agents: List of agent names or team/agent instances (all if None)
        validator: Node to validate the results of the TeamRun
        name: Optional name for the team
        description: Optional description for the team
        shared_prompt: Optional prompt for all agents
        picker: Agent to use for picking agents
        num_picks: Number of agents to pick
        pick_prompt: Prompt to use for picking agents
    """
    from llmling_agent.delegation.teamrun import TeamRun

    if agents is None:
        agents = list(self.agents.keys())

    # First resolve/configure agents
    resolved_agents: list[MessageNode[Any, Any]] = []
    for agent in agents:
        if isinstance(agent, str):
            agent = self.get_agent(agent)
        resolved_agents.append(agent)
    team = TeamRun(
        resolved_agents,
        name=name,
        description=description,
        validator=validator,
        shared_prompt=shared_prompt,
        picker=picker,
        num_picks=num_picks,
        pick_prompt=pick_prompt,
    )
    if name:
        self[name] = team
    return team

get_agent

get_agent(
    agent: AgentName | Agent[Any],
    *,
    model_override: str | None = None,
    session: SessionIdType | SessionQuery = None,
) -> Agent[TPoolDeps]
get_agent(
    agent: AgentName | Agent[Any],
    *,
    return_type: type[TResult],
    model_override: str | None = None,
    session: SessionIdType | SessionQuery = None,
) -> StructuredAgent[TPoolDeps, TResult]
get_agent(
    agent: AgentName | Agent[Any],
    *,
    deps: TCustomDeps,
    model_override: str | None = None,
    session: SessionIdType | SessionQuery = None,
) -> Agent[TCustomDeps]
get_agent(
    agent: AgentName | Agent[Any],
    *,
    deps: TCustomDeps,
    return_type: type[TResult],
    model_override: str | None = None,
    session: SessionIdType | SessionQuery = None,
) -> StructuredAgent[TCustomDeps, TResult]
get_agent(
    agent: AgentName | Agent[Any],
    *,
    deps: Any | None = None,
    return_type: Any | None = None,
    model_override: str | None = None,
    session: SessionIdType | SessionQuery = None,
) -> AnyAgent[Any, Any]

Get or configure an agent from the pool.

This method provides flexible agent configuration with dependency injection: - Without deps: Agent uses pool's shared dependencies - With deps: Agent uses provided custom dependencies - With return_type: Returns a StructuredAgent with type validation

Parameters:

Name Type Description Default
agent AgentName | Agent[Any]

Either agent name or instance

required
deps Any | None

Optional custom dependencies (overrides shared deps)

None
return_type Any | None

Optional type for structured responses

None
model_override str | None

Optional model override

None
session SessionIdType | SessionQuery

Optional session ID or query to recover conversation

None

Returns:

Name Type Description
Either AnyAgent[Any, Any]
AnyAgent[Any, Any]
  • Agent[TPoolDeps] when using pool's shared deps
AnyAgent[Any, Any]
  • Agent[TCustomDeps] when custom deps provided
AnyAgent[Any, Any]
  • StructuredAgent when return_type provided

Raises:

Type Description
KeyError

If agent name not found

ValueError

If configuration is invalid

Source code in src/llmling_agent/delegation/pool.py
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
def get_agent(
    self,
    agent: AgentName | Agent[Any],
    *,
    deps: Any | None = None,
    return_type: Any | None = None,
    model_override: str | None = None,
    session: SessionIdType | SessionQuery = None,
) -> AnyAgent[Any, Any]:
    """Get or configure an agent from the pool.

    This method provides flexible agent configuration with dependency injection:
    - Without deps: Agent uses pool's shared dependencies
    - With deps: Agent uses provided custom dependencies
    - With return_type: Returns a StructuredAgent with type validation

    Args:
        agent: Either agent name or instance
        deps: Optional custom dependencies (overrides shared deps)
        return_type: Optional type for structured responses
        model_override: Optional model override
        session: Optional session ID or query to recover conversation

    Returns:
        Either:
        - Agent[TPoolDeps] when using pool's shared deps
        - Agent[TCustomDeps] when custom deps provided
        - StructuredAgent when return_type provided

    Raises:
        KeyError: If agent name not found
        ValueError: If configuration is invalid
    """
    from llmling_agent.agent import Agent
    from llmling_agent.agent.context import AgentContext

    # Get base agent
    base = agent if isinstance(agent, Agent) else self.agents[agent]

    # Setup context and dependencies
    if base.context is None:
        base.context = AgentContext[Any].create_default(base.name)

    # Use custom deps if provided, otherwise use shared deps
    base.context.data = deps if deps is not None else self.shared_deps
    base.context.pool = self

    # Apply overrides
    if model_override:
        base.set_model(model_override)

    if session:
        base.conversation.load_history_from_database(session=session)

    # Convert to structured if needed
    if return_type is not None:
        return base.to_structured(return_type)

    return base

get_mermaid_diagram

get_mermaid_diagram(include_details: bool = True) -> str

Generate mermaid flowchart of all agents and their connections.

Parameters:

Name Type Description Default
include_details bool

Whether to show connection details (types, queues, etc)

True
Source code in src/llmling_agent/delegation/pool.py
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
def get_mermaid_diagram(
    self,
    include_details: bool = True,
) -> str:
    """Generate mermaid flowchart of all agents and their connections.

    Args:
        include_details: Whether to show connection details (types, queues, etc)
    """
    lines = ["flowchart LR"]

    # Add all agents as nodes
    for name in self.agents:
        lines.append(f"    {name}[{name}]")  # noqa: PERF401

    # Add all connections as edges
    for agent in self.agents.values():
        connections = agent.connections.get_connections()
        for talk in connections:
            talk = cast(Talk[Any], talk)  # help mypy understand it's a Talk
            source = talk.source.name
            for target in talk.targets:
                if include_details:
                    details: list[str] = []
                    details.append(talk.connection_type)
                    if talk.queued:
                        details.append(f"queued({talk.queue_strategy})")
                    if fn := talk.filter_condition:  # type: ignore
                        details.append(f"filter:{fn.__name__}")
                    if fn := talk.stop_condition:  # type: ignore
                        details.append(f"stop:{fn.__name__}")
                    if fn := talk.exit_condition:  # type: ignore
                        details.append(f"exit:{fn.__name__}")

                    label = f"|{' '.join(details)}|" if details else ""
                    lines.append(f"    {source}--{label}-->{target.name}")
                else:
                    lines.append(f"    {source}-->{target.name}")

    return "\n".join(lines)

list_nodes

list_nodes() -> list[str]

List available agent names.

Source code in src/llmling_agent/delegation/pool.py
827
828
829
def list_nodes(self) -> list[str]:
    """List available agent names."""
    return list(self.list_items())

run_event_loop async

run_event_loop()

Run pool in event-watching mode until interrupted.

Source code in src/llmling_agent/delegation/pool.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
async def run_event_loop(self):
    """Run pool in event-watching mode until interrupted."""
    import sys

    print("Starting event watch mode...")
    print("Active nodes: ", ", ".join(self.list_nodes()))
    print("Press Ctrl+C to stop")

    stop_event = asyncio.Event()

    if sys.platform != "win32":
        # Unix: Use signal handlers
        loop = asyncio.get_running_loop()
        for sig in (signal.SIGINT, signal.SIGTERM):
            loop.add_signal_handler(sig, stop_event.set)
        while True:
            await asyncio.sleep(1)
    else:
        # Windows: Use keyboard interrupt
        with suppress(KeyboardInterrupt):
            while True:
                await asyncio.sleep(1)

setup_agent_workers

setup_agent_workers(agent: AnyAgent[Any, Any])

Set up workers for an agent from configuration.

Source code in src/llmling_agent/delegation/pool.py
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
def setup_agent_workers(self, agent: AnyAgent[Any, Any]):
    """Set up workers for an agent from configuration."""
    for worker_config in agent.context.config.workers:
        try:
            worker = self.get_agent(worker_config.name)
            agent.register_worker(
                worker,
                name=worker_config.name,
                reset_history_on_run=worker_config.reset_history_on_run,
                pass_message_history=worker_config.pass_message_history,
                share_context=worker_config.share_context,
            )
        except KeyError as e:
            msg = f"Worker agent {worker_config.name!r} not found"
            raise ValueError(msg) from e

track_message_flow async

track_message_flow() -> AsyncIterator[MessageFlowTracker]

Track message flow during a context.

Source code in src/llmling_agent/delegation/pool.py
373
374
375
376
377
378
379
380
381
@asynccontextmanager
async def track_message_flow(self) -> AsyncIterator[MessageFlowTracker]:
    """Track message flow during a context."""
    tracker = MessageFlowTracker()
    self.connection_registry.message_flow.connect(tracker.track)
    try:
        yield tracker
    finally:
        self.connection_registry.message_flow.disconnect(tracker.track)

Show source on GitHub