Skip to content

delegation

Class info

Classes

Name Children Inherits
AgentPool
llmling_agent.delegation.pool
Pool managing message processing nodes (agents and teams).
    BaseTeam
    llmling_agent.delegation.base_team
    Base class for Team and TeamRun.
    Team
    llmling_agent.delegation.team
    Group of agents that can execute together.
      TeamRun
      llmling_agent.delegation.teamrun
      Handles team operations with monitoring.

        🛈 DocStrings

        Agent delegation and collaboration functionality.

        AgentPool

        Bases: BaseRegistry[NodeName, MessageEmitter[Any, Any]]

        Pool managing message processing nodes (agents and teams).

        Acts as a unified registry for all nodes, providing: - Centralized node management and lookup - Shared dependency injection - Connection management - Resource coordination

        Nodes can be accessed through: - nodes: All registered nodes (agents and teams) - agents: Only Agent instances - teams: Only Team instances

        Source code in src/llmling_agent/delegation/pool.py
         61
         62
         63
         64
         65
         66
         67
         68
         69
         70
         71
         72
         73
         74
         75
         76
         77
         78
         79
         80
         81
         82
         83
         84
         85
         86
         87
         88
         89
         90
         91
         92
         93
         94
         95
         96
         97
         98
         99
        100
        101
        102
        103
        104
        105
        106
        107
        108
        109
        110
        111
        112
        113
        114
        115
        116
        117
        118
        119
        120
        121
        122
        123
        124
        125
        126
        127
        128
        129
        130
        131
        132
        133
        134
        135
        136
        137
        138
        139
        140
        141
        142
        143
        144
        145
        146
        147
        148
        149
        150
        151
        152
        153
        154
        155
        156
        157
        158
        159
        160
        161
        162
        163
        164
        165
        166
        167
        168
        169
        170
        171
        172
        173
        174
        175
        176
        177
        178
        179
        180
        181
        182
        183
        184
        185
        186
        187
        188
        189
        190
        191
        192
        193
        194
        195
        196
        197
        198
        199
        200
        201
        202
        203
        204
        205
        206
        207
        208
        209
        210
        211
        212
        213
        214
        215
        216
        217
        218
        219
        220
        221
        222
        223
        224
        225
        226
        227
        228
        229
        230
        231
        232
        233
        234
        235
        236
        237
        238
        239
        240
        241
        242
        243
        244
        245
        246
        247
        248
        249
        250
        251
        252
        253
        254
        255
        256
        257
        258
        259
        260
        261
        262
        263
        264
        265
        266
        267
        268
        269
        270
        271
        272
        273
        274
        275
        276
        277
        278
        279
        280
        281
        282
        283
        284
        285
        286
        287
        288
        289
        290
        291
        292
        293
        294
        295
        296
        297
        298
        299
        300
        301
        302
        303
        304
        305
        306
        307
        308
        309
        310
        311
        312
        313
        314
        315
        316
        317
        318
        319
        320
        321
        322
        323
        324
        325
        326
        327
        328
        329
        330
        331
        332
        333
        334
        335
        336
        337
        338
        339
        340
        341
        342
        343
        344
        345
        346
        347
        348
        349
        350
        351
        352
        353
        354
        355
        356
        357
        358
        359
        360
        361
        362
        363
        364
        365
        366
        367
        368
        369
        370
        371
        372
        373
        374
        375
        376
        377
        378
        379
        380
        381
        382
        383
        384
        385
        386
        387
        388
        389
        390
        391
        392
        393
        394
        395
        396
        397
        398
        399
        400
        401
        402
        403
        404
        405
        406
        407
        408
        409
        410
        411
        412
        413
        414
        415
        416
        417
        418
        419
        420
        421
        422
        423
        424
        425
        426
        427
        428
        429
        430
        431
        432
        433
        434
        435
        436
        437
        438
        439
        440
        441
        442
        443
        444
        445
        446
        447
        448
        449
        450
        451
        452
        453
        454
        455
        456
        457
        458
        459
        460
        461
        462
        463
        464
        465
        466
        467
        468
        469
        470
        471
        472
        473
        474
        475
        476
        477
        478
        479
        480
        481
        482
        483
        484
        485
        486
        487
        488
        489
        490
        491
        492
        493
        494
        495
        496
        497
        498
        499
        500
        501
        502
        503
        504
        505
        506
        507
        508
        509
        510
        511
        512
        513
        514
        515
        516
        517
        518
        519
        520
        521
        522
        523
        524
        525
        526
        527
        528
        529
        530
        531
        532
        533
        534
        535
        536
        537
        538
        539
        540
        541
        542
        543
        544
        545
        546
        547
        548
        549
        550
        551
        552
        553
        554
        555
        556
        557
        558
        559
        560
        561
        562
        563
        564
        565
        566
        567
        568
        569
        570
        571
        572
        573
        574
        575
        576
        577
        578
        579
        580
        581
        582
        583
        584
        585
        586
        587
        588
        589
        590
        591
        592
        593
        594
        595
        596
        597
        598
        599
        600
        601
        602
        603
        604
        605
        606
        607
        608
        609
        610
        611
        612
        613
        614
        615
        616
        617
        618
        619
        620
        621
        622
        623
        624
        625
        626
        627
        628
        629
        630
        631
        632
        633
        634
        635
        636
        637
        638
        639
        640
        641
        642
        643
        644
        645
        646
        647
        648
        649
        650
        651
        652
        653
        654
        655
        656
        657
        658
        659
        660
        661
        662
        663
        664
        665
        666
        667
        668
        669
        670
        671
        672
        673
        674
        675
        676
        677
        678
        679
        680
        681
        682
        683
        684
        685
        686
        687
        688
        689
        690
        691
        692
        693
        694
        695
        696
        697
        698
        699
        700
        701
        702
        703
        704
        705
        706
        707
        708
        709
        710
        711
        712
        713
        714
        715
        716
        717
        718
        719
        720
        721
        722
        723
        724
        725
        726
        727
        728
        729
        730
        731
        732
        733
        734
        735
        736
        737
        738
        739
        740
        741
        742
        743
        744
        745
        746
        747
        748
        749
        750
        751
        752
        753
        754
        755
        756
        757
        758
        759
        760
        761
        762
        763
        764
        765
        766
        767
        768
        769
        770
        771
        772
        773
        774
        775
        776
        777
        778
        779
        780
        781
        782
        783
        784
        785
        786
        787
        788
        789
        790
        791
        792
        793
        794
        795
        796
        797
        798
        799
        800
        801
        802
        803
        804
        805
        806
        807
        808
        809
        810
        811
        812
        813
        814
        815
        816
        817
        818
        819
        820
        821
        822
        823
        824
        825
        826
        827
        828
        829
        830
        831
        832
        833
        834
        835
        836
        837
        838
        839
        840
        841
        842
        843
        844
        845
        846
        847
        848
        849
        850
        851
        852
        853
        854
        855
        856
        857
        858
        859
        860
        861
        862
        863
        864
        865
        866
        867
        868
        869
        870
        871
        872
        873
        874
        875
        876
        877
        878
        879
        880
        881
        882
        883
        884
        885
        886
        887
        888
        889
        890
        891
        892
        893
        894
        895
        896
        897
        898
        899
        900
        901
        902
        903
        904
        905
        906
        907
        908
        909
        910
        911
        912
        913
        914
        915
        916
        917
        918
        919
        920
        921
        922
        923
        924
        925
        926
        class AgentPool[TPoolDeps](BaseRegistry[NodeName, MessageEmitter[Any, Any]]):
            """Pool managing message processing nodes (agents and teams).
        
            Acts as a unified registry for all nodes, providing:
            - Centralized node management and lookup
            - Shared dependency injection
            - Connection management
            - Resource coordination
        
            Nodes can be accessed through:
            - nodes: All registered nodes (agents and teams)
            - agents: Only Agent instances
            - teams: Only Team instances
            """
        
            def __init__(
                self,
                manifest: StrPath | AgentsManifest | None = None,
                *,
                shared_deps: TPoolDeps | None = None,
                connect_nodes: bool = True,
                input_provider: InputProvider | None = None,
                parallel_load: bool = True,
            ):
                """Initialize agent pool with immediate agent creation.
        
                Args:
                    manifest: Agent configuration manifest
                    shared_deps: Dependencies to share across all nodes
                    connect_nodes: Whether to set up forwarding connections
                    input_provider: Input provider for tool / step confirmations / HumanAgents
                    parallel_load: Whether to load nodes in parallel (async)
        
                Raises:
                    ValueError: If manifest contains invalid node configurations
                    RuntimeError: If node initialization fails
                """
                super().__init__()
                from llmling_agent.models.manifest import AgentsManifest
                from llmling_agent.storage import StorageManager
        
                match manifest:
                    case None:
                        self.manifest = AgentsManifest()
                    case str():
                        self.manifest = AgentsManifest.from_file(manifest)
                    case AgentsManifest():
                        self.manifest = manifest
                    case _:
                        msg = f"Invalid config path: {manifest}"
                        raise ValueError(msg)
                self.shared_deps = shared_deps
                self._input_provider = input_provider
                self.exit_stack = AsyncExitStack()
                self.parallel_load = parallel_load
                self.storage = StorageManager(self.manifest.storage)
                self.connection_registry = ConnectionRegistry()
                self.mcp = MCPManager(
                    name="pool_mcp", servers=self.manifest.get_mcp_servers(), owner="pool"
                )
                self._tasks = TaskRegistry()
                # Register tasks from manifest
                for name, task in self.manifest.jobs.items():
                    self._tasks.register(name, task)
                self.pool_talk = TeamTalk[Any].from_nodes(list(self.nodes.values()))
                if self.manifest.pool_server and self.manifest.pool_server.enabled:
                    from llmling_agent.resource_providers.pool import PoolResourceProvider
                    from llmling_agent_mcp.server import LLMLingServer
        
                    provider = PoolResourceProvider(
                        self, zed_mode=self.manifest.pool_server.zed_mode
                    )
                    self.server: LLMLingServer | None = LLMLingServer(
                        provider=provider,
                        config=self.manifest.pool_server,
                    )
                else:
                    self.server = None
                # Create requested agents immediately
                for name in self.manifest.agents:
                    agent = self.manifest.get_agent(name, deps=shared_deps)
                    self.register(name, agent)
        
                # Then set up worker relationships
                for agent in self.agents.values():
                    self.setup_agent_workers(agent)
                self._create_teams()
                # Set up forwarding connections
                if connect_nodes:
                    self._connect_nodes()
        
            async def __aenter__(self) -> Self:
                """Enter async context and initialize all agents."""
                try:
                    # Add MCP tool provider to all agents
                    agents = list(self.agents.values())
                    teams = list(self.teams.values())
                    for agent in agents:
                        agent.tools.add_provider(self.mcp)
        
                    # Collect all components to initialize
                    components: list[AbstractAsyncContextManager[Any]] = [
                        self.mcp,
                        *agents,
                        *teams,
                    ]
        
                    # Add MCP server if configured
                    if self.server:
                        components.append(self.server)
        
                    # Initialize all components
                    if self.parallel_load:
                        await asyncio.gather(
                            *(self.exit_stack.enter_async_context(c) for c in components)
                        )
                    else:
                        for component in components:
                            await self.exit_stack.enter_async_context(component)
        
                except Exception as e:
                    await self.cleanup()
                    msg = "Failed to initialize agent pool"
                    logger.exception(msg, exc_info=e)
                    raise RuntimeError(msg) from e
                return self
        
            async def __aexit__(
                self,
                exc_type: type[BaseException] | None,
                exc_val: BaseException | None,
                exc_tb: TracebackType | None,
            ):
                """Exit async context."""
                # Remove MCP tool provider from all agents
                for agent in self.agents.values():
                    if self.mcp in agent.tools.providers:
                        agent.tools.remove_provider(self.mcp)
                await self.cleanup()
        
            async def cleanup(self):
                """Clean up all agents."""
                await self.exit_stack.aclose()
                self.clear()
        
            @overload
            def create_team_run(
                self,
                agents: Sequence[str],
                validator: MessageNode[Any, TResult] | None = None,
                *,
                name: str | None = None,
                description: str | None = None,
                shared_prompt: str | None = None,
                picker: AnyAgent[Any, Any] | None = None,
                num_picks: int | None = None,
                pick_prompt: str | None = None,
            ) -> TeamRun[TPoolDeps, TResult]: ...
        
            @overload
            def create_team_run[TDeps, TResult](
                self,
                agents: Sequence[MessageNode[TDeps, Any]],
                validator: MessageNode[Any, TResult] | None = None,
                *,
                name: str | None = None,
                description: str | None = None,
                shared_prompt: str | None = None,
                picker: AnyAgent[Any, Any] | None = None,
                num_picks: int | None = None,
                pick_prompt: str | None = None,
            ) -> TeamRun[TDeps, TResult]: ...
        
            @overload
            def create_team_run(
                self,
                agents: Sequence[AgentName | MessageNode[Any, Any]],
                validator: MessageNode[Any, TResult] | None = None,
                *,
                name: str | None = None,
                description: str | None = None,
                shared_prompt: str | None = None,
                picker: AnyAgent[Any, Any] | None = None,
                num_picks: int | None = None,
                pick_prompt: str | None = None,
            ) -> TeamRun[Any, TResult]: ...
        
            def create_team_run(
                self,
                agents: Sequence[AgentName | MessageNode[Any, Any]] | None = None,
                validator: MessageNode[Any, TResult] | None = None,
                *,
                name: str | None = None,
                description: str | None = None,
                shared_prompt: str | None = None,
                picker: AnyAgent[Any, Any] | None = None,
                num_picks: int | None = None,
                pick_prompt: str | None = None,
            ) -> TeamRun[Any, TResult]:
                """Create a a sequential TeamRun from a list of Agents.
        
                Args:
                    agents: List of agent names or team/agent instances (all if None)
                    validator: Node to validate the results of the TeamRun
                    name: Optional name for the team
                    description: Optional description for the team
                    shared_prompt: Optional prompt for all agents
                    picker: Agent to use for picking agents
                    num_picks: Number of agents to pick
                    pick_prompt: Prompt to use for picking agents
                """
                from llmling_agent.delegation.teamrun import TeamRun
        
                if agents is None:
                    agents = list(self.agents.keys())
        
                # First resolve/configure agents
                resolved_agents: list[MessageNode[Any, Any]] = []
                for agent in agents:
                    if isinstance(agent, str):
                        agent = self.get_agent(agent)
                    resolved_agents.append(agent)
                team = TeamRun(
                    resolved_agents,
                    name=name,
                    description=description,
                    validator=validator,
                    shared_prompt=shared_prompt,
                    picker=picker,
                    num_picks=num_picks,
                    pick_prompt=pick_prompt,
                )
                if name:
                    self[name] = team
                return team
        
            @overload
            def create_team(self, agents: Sequence[str]) -> Team[TPoolDeps]: ...
        
            @overload
            def create_team[TDeps](
                self,
                agents: Sequence[MessageNode[TDeps, Any]],
                *,
                name: str | None = None,
                description: str | None = None,
                shared_prompt: str | None = None,
                picker: AnyAgent[Any, Any] | None = None,
                num_picks: int | None = None,
                pick_prompt: str | None = None,
            ) -> Team[TDeps]: ...
        
            @overload
            def create_team(
                self,
                agents: Sequence[AgentName | MessageNode[Any, Any]],
                *,
                name: str | None = None,
                description: str | None = None,
                shared_prompt: str | None = None,
                picker: AnyAgent[Any, Any] | None = None,
                num_picks: int | None = None,
                pick_prompt: str | None = None,
            ) -> Team[Any]: ...
        
            def create_team(
                self,
                agents: Sequence[AgentName | MessageNode[Any, Any]] | None = None,
                *,
                name: str | None = None,
                description: str | None = None,
                shared_prompt: str | None = None,
                picker: AnyAgent[Any, Any] | None = None,
                num_picks: int | None = None,
                pick_prompt: str | None = None,
            ) -> Team[Any]:
                """Create a group from agent names or instances.
        
                Args:
                    agents: List of agent names or instances (all if None)
                    name: Optional name for the team
                    description: Optional description for the team
                    shared_prompt: Optional prompt for all agents
                    picker: Agent to use for picking agents
                    num_picks: Number of agents to pick
                    pick_prompt: Prompt to use for picking agents
                """
                from llmling_agent.delegation.team import Team
        
                if agents is None:
                    agents = list(self.agents.keys())
        
                # First resolve/configure agents
                resolved_agents: list[MessageNode[Any, Any]] = []
                for agent in agents:
                    if isinstance(agent, str):
                        agent = self.get_agent(agent)
                    resolved_agents.append(agent)
        
                team = Team(
                    name=name,
                    description=description,
                    agents=resolved_agents,
                    shared_prompt=shared_prompt,
                    picker=picker,
                    num_picks=num_picks,
                    pick_prompt=pick_prompt,
                )
                if name:
                    self[name] = team
                return team
        
            @asynccontextmanager
            async def track_message_flow(self) -> AsyncIterator[MessageFlowTracker]:
                """Track message flow during a context."""
                tracker = MessageFlowTracker()
                self.connection_registry.message_flow.connect(tracker.track)
                try:
                    yield tracker
                finally:
                    self.connection_registry.message_flow.disconnect(tracker.track)
        
            async def run_event_loop(self):
                """Run pool in event-watching mode until interrupted."""
                import sys
        
                print("Starting event watch mode...")
                print("Active nodes: ", ", ".join(self.list_nodes()))
                print("Press Ctrl+C to stop")
        
                stop_event = asyncio.Event()
        
                if sys.platform != "win32":
                    # Unix: Use signal handlers
                    loop = asyncio.get_running_loop()
                    for sig in (signal.SIGINT, signal.SIGTERM):
                        loop.add_signal_handler(sig, stop_event.set)
                    while True:
                        await asyncio.sleep(1)
                else:
                    # Windows: Use keyboard interrupt
                    with suppress(KeyboardInterrupt):
                        while True:
                            await asyncio.sleep(1)
        
            @property
            def agents(self) -> dict[str, AnyAgent[Any, Any]]:
                """Get agents dict (backward compatibility)."""
                return {
                    i.name: i
                    for i in self._items.values()
                    if isinstance(i, Agent | StructuredAgent)
                }
        
            @property
            def teams(self) -> dict[str, BaseTeam[Any, Any]]:
                """Get agents dict (backward compatibility)."""
                from llmling_agent.delegation.base_team import BaseTeam
        
                return {i.name: i for i in self._items.values() if isinstance(i, BaseTeam)}
        
            @property
            def nodes(self) -> dict[str, MessageNode[Any, Any]]:
                """Get agents dict (backward compatibility)."""
                from llmling_agent.messaging.messagenode import MessageNode
        
                return {i.name: i for i in self._items.values() if isinstance(i, MessageNode)}
        
            @property
            def event_nodes(self) -> dict[str, EventNode[Any]]:
                """Get agents dict (backward compatibility)."""
                from llmling_agent.messaging.eventnode import EventNode
        
                return {i.name: i for i in self._items.values() if isinstance(i, EventNode)}
        
            @property
            def node_events(self) -> DictEvents:
                """Get node events."""
                return self._items.events
        
            @property
            def _error_class(self) -> type[LLMLingError]:
                """Error class for agent operations."""
                return LLMLingError
        
            def _validate_item(
                self, item: MessageEmitter[Any, Any] | Any
            ) -> MessageEmitter[Any, Any]:
                """Validate and convert items before registration.
        
                Args:
                    item: Item to validate
        
                Returns:
                    Validated Node
        
                Raises:
                    LLMlingError: If item is not a valid node
                """
                if not isinstance(item, MessageEmitter):
                    msg = f"Item must be Agent or Team, got {type(item)}"
                    raise self._error_class(msg)
                item.context.pool = self
                return item
        
            def _create_teams(self):
                """Create all teams in two phases to allow nesting."""
                # Phase 1: Create empty teams
        
                empty_teams: dict[str, BaseTeam[Any, Any]] = {}
                for name, config in self.manifest.teams.items():
                    if config.mode == "parallel":
                        empty_teams[name] = Team(
                            [], name=name, shared_prompt=config.shared_prompt
                        )
                    else:
                        empty_teams[name] = TeamRun(
                            [], name=name, shared_prompt=config.shared_prompt
                        )
        
                # Phase 2: Resolve members
                for name, config in self.manifest.teams.items():
                    team = empty_teams[name]
                    members: list[MessageNode[Any, Any]] = []
                    for member in config.members:
                        if member in self.agents:
                            members.append(self.agents[member])
                        elif member in empty_teams:
                            members.append(empty_teams[member])
                        else:
                            msg = f"Unknown team member: {member}"
                            raise ValueError(msg)
                    team.agents.extend(members)
                    self[name] = team
        
            def _connect_nodes(self):
                """Set up connections defined in manifest."""
                # Merge agent and team configs into one dict of nodes with connections
                for name, config in self.manifest.nodes.items():
                    source = self[name]
                    for target in config.connections or []:
                        match target:
                            case NodeConnectionConfig():
                                if target.name not in self:
                                    msg = f"Forward target {target.name} not found for {name}"
                                    raise ValueError(msg)
                                target_node = self[target.name]
                            case FileConnectionConfig() | CallableConnectionConfig():
                                target_node = Agent(provider=target.get_provider())
                            case _:
                                msg = f"Invalid connection config: {target}"
                                raise ValueError(msg)
        
                        source.connect_to(
                            target_node,  # type: ignore  # recognized as "Any | BaseTeam[Any, Any]" by mypy?
                            connection_type=target.connection_type,
                            name=name,
                            priority=target.priority,
                            delay=target.delay,
                            queued=target.queued,
                            queue_strategy=target.queue_strategy,
                            transform=target.transform,
                            filter_condition=target.filter_condition.check
                            if target.filter_condition
                            else None,
                            stop_condition=target.stop_condition.check
                            if target.stop_condition
                            else None,
                            exit_condition=target.exit_condition.check
                            if target.exit_condition
                            else None,
                        )
                        source.connections.set_wait_state(
                            target_node,
                            wait=target.wait_for_completion,
                        )
        
            @overload
            async def clone_agent[TDeps](
                self,
                agent: AgentName | Agent[TDeps],
                new_name: AgentName | None = None,
                *,
                system_prompts: list[str] | None = None,
                template_context: dict[str, Any] | None = None,
            ) -> Agent[TDeps]: ...
        
            @overload
            async def clone_agent[TDeps, TResult](
                self,
                agent: StructuredAgent[TDeps, TResult],
                new_name: AgentName | None = None,
                *,
                system_prompts: list[str] | None = None,
                template_context: dict[str, Any] | None = None,
            ) -> StructuredAgent[TDeps, TResult]: ...
        
            async def clone_agent[TDeps, TAgentResult](
                self,
                agent: AgentName | AnyAgent[TDeps, TAgentResult],
                new_name: AgentName | None = None,
                *,
                system_prompts: list[str] | None = None,
                template_context: dict[str, Any] | None = None,
            ) -> AnyAgent[TDeps, TAgentResult]:
                """Create a copy of an agent.
        
                Args:
                    agent: Agent instance or name to clone
                    new_name: Optional name for the clone
                    system_prompts: Optional different prompts
                    template_context: Variables for template rendering
        
                Returns:
                    The new agent instance
                """
                from llmling_agent.agent import Agent, StructuredAgent
        
                # Get original config
                if isinstance(agent, str):
                    if agent not in self.manifest.agents:
                        msg = f"Agent {agent} not found"
                        raise KeyError(msg)
                    config = self.manifest.agents[agent]
                    original_agent: AnyAgent[Any, Any] = self.get_agent(agent)
                else:
                    config = agent.context.config  # type: ignore
                    original_agent = agent
        
                # Create new config
                new_config = config.model_copy(deep=True)
        
                # Apply overrides
                if system_prompts:
                    new_config.system_prompts = system_prompts
        
                # Handle template rendering
                if template_context:
                    new_config.system_prompts = new_config.render_system_prompts(template_context)
        
                # Create new agent with same runtime
                new_agent = Agent[TDeps](
                    runtime=original_agent.runtime,
                    context=original_agent.context,
                    # result_type=original_agent.actual_type,
                    provider=new_config.get_provider(),
                    system_prompt=new_config.system_prompts,
                    name=new_name or f"{config.name}_copy_{len(self.agents)}",
                )
                if isinstance(original_agent, StructuredAgent):
                    new_agent = new_agent.to_structured(original_agent.actual_type)
        
                # Register in pool
                agent_name = new_agent.name
                self.manifest.agents[agent_name] = new_config
                self.agents[agent_name] = new_agent
                return await self.exit_stack.enter_async_context(new_agent)
        
            @overload
            async def create_agent(
                self,
                name: AgentName,
                *,
                session: SessionIdType | SessionQuery = None,
                name_override: str | None = None,
            ) -> Agent[TPoolDeps]: ...
        
            @overload
            async def create_agent[TCustomDeps](
                self,
                name: AgentName,
                *,
                deps: TCustomDeps,
                session: SessionIdType | SessionQuery = None,
                name_override: str | None = None,
            ) -> Agent[TCustomDeps]: ...
        
            @overload
            async def create_agent[TResult](
                self,
                name: AgentName,
                *,
                return_type: type[TResult],
                session: SessionIdType | SessionQuery = None,
                name_override: str | None = None,
            ) -> StructuredAgent[TPoolDeps, TResult]: ...
        
            @overload
            async def create_agent[TCustomDeps, TResult](
                self,
                name: AgentName,
                *,
                deps: TCustomDeps,
                return_type: type[TResult],
                session: SessionIdType | SessionQuery = None,
                name_override: str | None = None,
            ) -> StructuredAgent[TCustomDeps, TResult]: ...
        
            async def create_agent(
                self,
                name: AgentName,
                *,
                deps: Any | None = None,
                return_type: Any | None = None,
                session: SessionIdType | SessionQuery = None,
                name_override: str | None = None,
            ) -> AnyAgent[Any, Any]:
                """Create a new agent instance from configuration.
        
                Args:
                    name: Name of the agent configuration to use
                    deps: Optional custom dependencies (overrides pool deps)
                    return_type: Optional type for structured responses
                    session: Optional session ID or query to recover conversation
                    name_override: Optional different name for this instance
        
                Returns:
                    New agent instance with the specified configuration
        
                Raises:
                    KeyError: If agent configuration not found
                    ValueError: If configuration is invalid
                """
                if name not in self.manifest.agents:
                    msg = f"Agent configuration {name!r} not found"
                    raise KeyError(msg)
        
                # Use Manifest.get_agent for proper initialization
                final_deps = deps if deps is not None else self.shared_deps
                agent = self.manifest.get_agent(name, deps=final_deps)
                # Override name if requested
                if name_override:
                    agent.name = name_override
        
                # Set pool reference
                agent.context.pool = self
        
                # Handle session if provided
                if session:
                    agent.conversation.load_history_from_database(session=session)
        
                # Initialize agent through exit stack
                agent = await self.exit_stack.enter_async_context(agent)
        
                # Override structured configuration if provided
                if return_type is not None:
                    return agent.to_structured(return_type)
        
                return agent
        
            def setup_agent_workers(self, agent: AnyAgent[Any, Any]):
                """Set up workers for an agent from configuration."""
                for worker_config in agent.context.config.workers:
                    try:
                        worker = self.get_agent(worker_config.name)
                        agent.register_worker(
                            worker,
                            name=worker_config.name,
                            reset_history_on_run=worker_config.reset_history_on_run,
                            pass_message_history=worker_config.pass_message_history,
                            share_context=worker_config.share_context,
                        )
                    except KeyError as e:
                        msg = f"Worker agent {worker_config.name!r} not found"
                        raise ValueError(msg) from e
        
            @overload
            def get_agent(
                self,
                agent: AgentName | Agent[Any],
                *,
                model_override: str | None = None,
                session: SessionIdType | SessionQuery = None,
            ) -> Agent[TPoolDeps]: ...
        
            @overload
            def get_agent[TResult](
                self,
                agent: AgentName | Agent[Any],
                *,
                return_type: type[TResult],
                model_override: str | None = None,
                session: SessionIdType | SessionQuery = None,
            ) -> StructuredAgent[TPoolDeps, TResult]: ...
        
            @overload
            def get_agent[TCustomDeps](
                self,
                agent: AgentName | Agent[Any],
                *,
                deps: TCustomDeps,
                model_override: str | None = None,
                session: SessionIdType | SessionQuery = None,
            ) -> Agent[TCustomDeps]: ...
        
            @overload
            def get_agent[TCustomDeps, TResult](
                self,
                agent: AgentName | Agent[Any],
                *,
                deps: TCustomDeps,
                return_type: type[TResult],
                model_override: str | None = None,
                session: SessionIdType | SessionQuery = None,
            ) -> StructuredAgent[TCustomDeps, TResult]: ...
        
            def get_agent(
                self,
                agent: AgentName | Agent[Any],
                *,
                deps: Any | None = None,
                return_type: Any | None = None,
                model_override: str | None = None,
                session: SessionIdType | SessionQuery = None,
            ) -> AnyAgent[Any, Any]:
                """Get or configure an agent from the pool.
        
                This method provides flexible agent configuration with dependency injection:
                - Without deps: Agent uses pool's shared dependencies
                - With deps: Agent uses provided custom dependencies
                - With return_type: Returns a StructuredAgent with type validation
        
                Args:
                    agent: Either agent name or instance
                    deps: Optional custom dependencies (overrides shared deps)
                    return_type: Optional type for structured responses
                    model_override: Optional model override
                    session: Optional session ID or query to recover conversation
        
                Returns:
                    Either:
                    - Agent[TPoolDeps] when using pool's shared deps
                    - Agent[TCustomDeps] when custom deps provided
                    - StructuredAgent when return_type provided
        
                Raises:
                    KeyError: If agent name not found
                    ValueError: If configuration is invalid
                """
                from llmling_agent.agent import Agent
                from llmling_agent.agent.context import AgentContext
        
                # Get base agent
                base = agent if isinstance(agent, Agent) else self.agents[agent]
        
                # Setup context and dependencies
                if base.context is None:
                    base.context = AgentContext[Any].create_default(base.name)
        
                # Use custom deps if provided, otherwise use shared deps
                base.context.data = deps if deps is not None else self.shared_deps
                base.context.pool = self
        
                # Apply overrides
                if model_override:
                    base.set_model(model_override)
        
                if session:
                    base.conversation.load_history_from_database(session=session)
        
                # Convert to structured if needed
                if return_type is not None:
                    return base.to_structured(return_type)
        
                return base
        
            def list_nodes(self) -> list[str]:
                """List available agent names."""
                return list(self.list_items())
        
            def get_job(self, name: str) -> Job[Any, Any]:
                return self._tasks[name]
        
            def register_task(self, name: str, task: Job[Any, Any]):
                self._tasks.register(name, task)
        
            @overload
            async def add_agent(
                self,
                name: AgentName,
                *,
                result_type: None = None,
                **kwargs: Unpack[AgentKwargs],
            ) -> Agent[Any]: ...
        
            @overload
            async def add_agent[TResult](
                self,
                name: AgentName,
                *,
                result_type: type[TResult] | str | ResponseDefinition,
                **kwargs: Unpack[AgentKwargs],
            ) -> StructuredAgent[Any, TResult]: ...
        
            async def add_agent(
                self,
                name: AgentName,
                *,
                result_type: type[Any] | str | ResponseDefinition | None = None,
                **kwargs: Unpack[AgentKwargs],
            ) -> Agent[Any] | StructuredAgent[Any, Any]:
                """Add a new permanent agent to the pool.
        
                Args:
                    name: Name for the new agent
                    result_type: Optional type for structured responses:
                        - None: Regular unstructured agent
                        - type: Python type for validation
                        - str: Name of response definition
                        - ResponseDefinition: Complete response definition
                    **kwargs: Additional agent configuration
        
                Returns:
                    Either a regular Agent or StructuredAgent depending on result_type
                """
                from llmling_agent.agent import Agent
        
                agent: AnyAgent[Any, Any] = Agent(name=name, **kwargs)
                agent.tools.add_provider(self.mcp)
                agent = await self.exit_stack.enter_async_context(agent)
                # Convert to structured if needed
                if result_type is not None:
                    agent = agent.to_structured(result_type)
                self.register(name, agent)
                return agent
        
            def get_mermaid_diagram(
                self,
                include_details: bool = True,
            ) -> str:
                """Generate mermaid flowchart of all agents and their connections.
        
                Args:
                    include_details: Whether to show connection details (types, queues, etc)
                """
                lines = ["flowchart LR"]
        
                # Add all agents as nodes
                for name in self.agents:
                    lines.append(f"    {name}[{name}]")  # noqa: PERF401
        
                # Add all connections as edges
                for agent in self.agents.values():
                    connections = agent.connections.get_connections()
                    for talk in connections:
                        talk = cast(Talk[Any], talk)  # help mypy understand it's a Talk
                        source = talk.source.name
                        for target in talk.targets:
                            if include_details:
                                details: list[str] = []
                                details.append(talk.connection_type)
                                if talk.queued:
                                    details.append(f"queued({talk.queue_strategy})")
                                if fn := talk.filter_condition:  # type: ignore
                                    details.append(f"filter:{fn.__name__}")
                                if fn := talk.stop_condition:  # type: ignore
                                    details.append(f"stop:{fn.__name__}")
                                if fn := talk.exit_condition:  # type: ignore
                                    details.append(f"exit:{fn.__name__}")
        
                                label = f"|{' '.join(details)}|" if details else ""
                                lines.append(f"    {source}--{label}-->{target.name}")
                            else:
                                lines.append(f"    {source}-->{target.name}")
        
                return "\n".join(lines)
        

        _error_class property

        _error_class: type[LLMLingError]
        

        Error class for agent operations.

        agents property

        agents: dict[str, AnyAgent[Any, Any]]
        

        Get agents dict (backward compatibility).

        event_nodes property

        event_nodes: dict[str, EventNode[Any]]
        

        Get agents dict (backward compatibility).

        node_events property

        node_events: DictEvents
        

        Get node events.

        nodes property

        nodes: dict[str, MessageNode[Any, Any]]
        

        Get agents dict (backward compatibility).

        teams property

        teams: dict[str, BaseTeam[Any, Any]]
        

        Get agents dict (backward compatibility).

        __aenter__ async

        __aenter__() -> Self
        

        Enter async context and initialize all agents.

        Source code in src/llmling_agent/delegation/pool.py
        152
        153
        154
        155
        156
        157
        158
        159
        160
        161
        162
        163
        164
        165
        166
        167
        168
        169
        170
        171
        172
        173
        174
        175
        176
        177
        178
        179
        180
        181
        182
        183
        184
        185
        186
        async def __aenter__(self) -> Self:
            """Enter async context and initialize all agents."""
            try:
                # Add MCP tool provider to all agents
                agents = list(self.agents.values())
                teams = list(self.teams.values())
                for agent in agents:
                    agent.tools.add_provider(self.mcp)
        
                # Collect all components to initialize
                components: list[AbstractAsyncContextManager[Any]] = [
                    self.mcp,
                    *agents,
                    *teams,
                ]
        
                # Add MCP server if configured
                if self.server:
                    components.append(self.server)
        
                # Initialize all components
                if self.parallel_load:
                    await asyncio.gather(
                        *(self.exit_stack.enter_async_context(c) for c in components)
                    )
                else:
                    for component in components:
                        await self.exit_stack.enter_async_context(component)
        
            except Exception as e:
                await self.cleanup()
                msg = "Failed to initialize agent pool"
                logger.exception(msg, exc_info=e)
                raise RuntimeError(msg) from e
            return self
        

        __aexit__ async

        __aexit__(
            exc_type: type[BaseException] | None,
            exc_val: BaseException | None,
            exc_tb: TracebackType | None,
        )
        

        Exit async context.

        Source code in src/llmling_agent/delegation/pool.py
        188
        189
        190
        191
        192
        193
        194
        195
        196
        197
        198
        199
        async def __aexit__(
            self,
            exc_type: type[BaseException] | None,
            exc_val: BaseException | None,
            exc_tb: TracebackType | None,
        ):
            """Exit async context."""
            # Remove MCP tool provider from all agents
            for agent in self.agents.values():
                if self.mcp in agent.tools.providers:
                    agent.tools.remove_provider(self.mcp)
            await self.cleanup()
        

        __init__

        __init__(
            manifest: StrPath | AgentsManifest | None = None,
            *,
            shared_deps: TPoolDeps | None = None,
            connect_nodes: bool = True,
            input_provider: InputProvider | None = None,
            parallel_load: bool = True,
        )
        

        Initialize agent pool with immediate agent creation.

        Parameters:

        Name Type Description Default
        manifest StrPath | AgentsManifest | None

        Agent configuration manifest

        None
        shared_deps TPoolDeps | None

        Dependencies to share across all nodes

        None
        connect_nodes bool

        Whether to set up forwarding connections

        True
        input_provider InputProvider | None

        Input provider for tool / step confirmations / HumanAgents

        None
        parallel_load bool

        Whether to load nodes in parallel (async)

        True

        Raises:

        Type Description
        ValueError

        If manifest contains invalid node configurations

        RuntimeError

        If node initialization fails

        Source code in src/llmling_agent/delegation/pool.py
         76
         77
         78
         79
         80
         81
         82
         83
         84
         85
         86
         87
         88
         89
         90
         91
         92
         93
         94
         95
         96
         97
         98
         99
        100
        101
        102
        103
        104
        105
        106
        107
        108
        109
        110
        111
        112
        113
        114
        115
        116
        117
        118
        119
        120
        121
        122
        123
        124
        125
        126
        127
        128
        129
        130
        131
        132
        133
        134
        135
        136
        137
        138
        139
        140
        141
        142
        143
        144
        145
        146
        147
        148
        149
        150
        def __init__(
            self,
            manifest: StrPath | AgentsManifest | None = None,
            *,
            shared_deps: TPoolDeps | None = None,
            connect_nodes: bool = True,
            input_provider: InputProvider | None = None,
            parallel_load: bool = True,
        ):
            """Initialize agent pool with immediate agent creation.
        
            Args:
                manifest: Agent configuration manifest
                shared_deps: Dependencies to share across all nodes
                connect_nodes: Whether to set up forwarding connections
                input_provider: Input provider for tool / step confirmations / HumanAgents
                parallel_load: Whether to load nodes in parallel (async)
        
            Raises:
                ValueError: If manifest contains invalid node configurations
                RuntimeError: If node initialization fails
            """
            super().__init__()
            from llmling_agent.models.manifest import AgentsManifest
            from llmling_agent.storage import StorageManager
        
            match manifest:
                case None:
                    self.manifest = AgentsManifest()
                case str():
                    self.manifest = AgentsManifest.from_file(manifest)
                case AgentsManifest():
                    self.manifest = manifest
                case _:
                    msg = f"Invalid config path: {manifest}"
                    raise ValueError(msg)
            self.shared_deps = shared_deps
            self._input_provider = input_provider
            self.exit_stack = AsyncExitStack()
            self.parallel_load = parallel_load
            self.storage = StorageManager(self.manifest.storage)
            self.connection_registry = ConnectionRegistry()
            self.mcp = MCPManager(
                name="pool_mcp", servers=self.manifest.get_mcp_servers(), owner="pool"
            )
            self._tasks = TaskRegistry()
            # Register tasks from manifest
            for name, task in self.manifest.jobs.items():
                self._tasks.register(name, task)
            self.pool_talk = TeamTalk[Any].from_nodes(list(self.nodes.values()))
            if self.manifest.pool_server and self.manifest.pool_server.enabled:
                from llmling_agent.resource_providers.pool import PoolResourceProvider
                from llmling_agent_mcp.server import LLMLingServer
        
                provider = PoolResourceProvider(
                    self, zed_mode=self.manifest.pool_server.zed_mode
                )
                self.server: LLMLingServer | None = LLMLingServer(
                    provider=provider,
                    config=self.manifest.pool_server,
                )
            else:
                self.server = None
            # Create requested agents immediately
            for name in self.manifest.agents:
                agent = self.manifest.get_agent(name, deps=shared_deps)
                self.register(name, agent)
        
            # Then set up worker relationships
            for agent in self.agents.values():
                self.setup_agent_workers(agent)
            self._create_teams()
            # Set up forwarding connections
            if connect_nodes:
                self._connect_nodes()
        

        _connect_nodes

        _connect_nodes()
        

        Set up connections defined in manifest.

        Source code in src/llmling_agent/delegation/pool.py
        496
        497
        498
        499
        500
        501
        502
        503
        504
        505
        506
        507
        508
        509
        510
        511
        512
        513
        514
        515
        516
        517
        518
        519
        520
        521
        522
        523
        524
        525
        526
        527
        528
        529
        530
        531
        532
        533
        534
        535
        536
        def _connect_nodes(self):
            """Set up connections defined in manifest."""
            # Merge agent and team configs into one dict of nodes with connections
            for name, config in self.manifest.nodes.items():
                source = self[name]
                for target in config.connections or []:
                    match target:
                        case NodeConnectionConfig():
                            if target.name not in self:
                                msg = f"Forward target {target.name} not found for {name}"
                                raise ValueError(msg)
                            target_node = self[target.name]
                        case FileConnectionConfig() | CallableConnectionConfig():
                            target_node = Agent(provider=target.get_provider())
                        case _:
                            msg = f"Invalid connection config: {target}"
                            raise ValueError(msg)
        
                    source.connect_to(
                        target_node,  # type: ignore  # recognized as "Any | BaseTeam[Any, Any]" by mypy?
                        connection_type=target.connection_type,
                        name=name,
                        priority=target.priority,
                        delay=target.delay,
                        queued=target.queued,
                        queue_strategy=target.queue_strategy,
                        transform=target.transform,
                        filter_condition=target.filter_condition.check
                        if target.filter_condition
                        else None,
                        stop_condition=target.stop_condition.check
                        if target.stop_condition
                        else None,
                        exit_condition=target.exit_condition.check
                        if target.exit_condition
                        else None,
                    )
                    source.connections.set_wait_state(
                        target_node,
                        wait=target.wait_for_completion,
                    )
        

        _create_teams

        _create_teams()
        

        Create all teams in two phases to allow nesting.

        Source code in src/llmling_agent/delegation/pool.py
        466
        467
        468
        469
        470
        471
        472
        473
        474
        475
        476
        477
        478
        479
        480
        481
        482
        483
        484
        485
        486
        487
        488
        489
        490
        491
        492
        493
        494
        def _create_teams(self):
            """Create all teams in two phases to allow nesting."""
            # Phase 1: Create empty teams
        
            empty_teams: dict[str, BaseTeam[Any, Any]] = {}
            for name, config in self.manifest.teams.items():
                if config.mode == "parallel":
                    empty_teams[name] = Team(
                        [], name=name, shared_prompt=config.shared_prompt
                    )
                else:
                    empty_teams[name] = TeamRun(
                        [], name=name, shared_prompt=config.shared_prompt
                    )
        
            # Phase 2: Resolve members
            for name, config in self.manifest.teams.items():
                team = empty_teams[name]
                members: list[MessageNode[Any, Any]] = []
                for member in config.members:
                    if member in self.agents:
                        members.append(self.agents[member])
                    elif member in empty_teams:
                        members.append(empty_teams[member])
                    else:
                        msg = f"Unknown team member: {member}"
                        raise ValueError(msg)
                team.agents.extend(members)
                self[name] = team
        

        _validate_item

        _validate_item(item: MessageEmitter[Any, Any] | Any) -> MessageEmitter[Any, Any]
        

        Validate and convert items before registration.

        Parameters:

        Name Type Description Default
        item MessageEmitter[Any, Any] | Any

        Item to validate

        required

        Returns:

        Type Description
        MessageEmitter[Any, Any]

        Validated Node

        Raises:

        Type Description
        LLMlingError

        If item is not a valid node

        Source code in src/llmling_agent/delegation/pool.py
        446
        447
        448
        449
        450
        451
        452
        453
        454
        455
        456
        457
        458
        459
        460
        461
        462
        463
        464
        def _validate_item(
            self, item: MessageEmitter[Any, Any] | Any
        ) -> MessageEmitter[Any, Any]:
            """Validate and convert items before registration.
        
            Args:
                item: Item to validate
        
            Returns:
                Validated Node
        
            Raises:
                LLMlingError: If item is not a valid node
            """
            if not isinstance(item, MessageEmitter):
                msg = f"Item must be Agent or Team, got {type(item)}"
                raise self._error_class(msg)
            item.context.pool = self
            return item
        

        add_agent async

        add_agent(
            name: AgentName, *, result_type: None = None, **kwargs: Unpack[AgentKwargs]
        ) -> Agent[Any]
        
        add_agent(
            name: AgentName,
            *,
            result_type: type[TResult] | str | ResponseDefinition,
            **kwargs: Unpack[AgentKwargs],
        ) -> StructuredAgent[Any, TResult]
        
        add_agent(
            name: AgentName,
            *,
            result_type: type[Any] | str | ResponseDefinition | None = None,
            **kwargs: Unpack[AgentKwargs],
        ) -> Agent[Any] | StructuredAgent[Any, Any]
        

        Add a new permanent agent to the pool.

        Parameters:

        Name Type Description Default
        name AgentName

        Name for the new agent

        required
        result_type type[Any] | str | ResponseDefinition | None

        Optional type for structured responses: - None: Regular unstructured agent - type: Python type for validation - str: Name of response definition - ResponseDefinition: Complete response definition

        None
        **kwargs Unpack[AgentKwargs]

        Additional agent configuration

        {}

        Returns:

        Type Description
        Agent[Any] | StructuredAgent[Any, Any]

        Either a regular Agent or StructuredAgent depending on result_type

        Source code in src/llmling_agent/delegation/pool.py
        855
        856
        857
        858
        859
        860
        861
        862
        863
        864
        865
        866
        867
        868
        869
        870
        871
        872
        873
        874
        875
        876
        877
        878
        879
        880
        881
        882
        883
        884
        885
        async def add_agent(
            self,
            name: AgentName,
            *,
            result_type: type[Any] | str | ResponseDefinition | None = None,
            **kwargs: Unpack[AgentKwargs],
        ) -> Agent[Any] | StructuredAgent[Any, Any]:
            """Add a new permanent agent to the pool.
        
            Args:
                name: Name for the new agent
                result_type: Optional type for structured responses:
                    - None: Regular unstructured agent
                    - type: Python type for validation
                    - str: Name of response definition
                    - ResponseDefinition: Complete response definition
                **kwargs: Additional agent configuration
        
            Returns:
                Either a regular Agent or StructuredAgent depending on result_type
            """
            from llmling_agent.agent import Agent
        
            agent: AnyAgent[Any, Any] = Agent(name=name, **kwargs)
            agent.tools.add_provider(self.mcp)
            agent = await self.exit_stack.enter_async_context(agent)
            # Convert to structured if needed
            if result_type is not None:
                agent = agent.to_structured(result_type)
            self.register(name, agent)
            return agent
        

        cleanup async

        cleanup()
        

        Clean up all agents.

        Source code in src/llmling_agent/delegation/pool.py
        201
        202
        203
        204
        async def cleanup(self):
            """Clean up all agents."""
            await self.exit_stack.aclose()
            self.clear()
        

        clone_agent async

        clone_agent(
            agent: AgentName | Agent[TDeps],
            new_name: AgentName | None = None,
            *,
            system_prompts: list[str] | None = None,
            template_context: dict[str, Any] | None = None,
        ) -> Agent[TDeps]
        
        clone_agent(
            agent: StructuredAgent[TDeps, TResult],
            new_name: AgentName | None = None,
            *,
            system_prompts: list[str] | None = None,
            template_context: dict[str, Any] | None = None,
        ) -> StructuredAgent[TDeps, TResult]
        
        clone_agent(
            agent: AgentName | AnyAgent[TDeps, TAgentResult],
            new_name: AgentName | None = None,
            *,
            system_prompts: list[str] | None = None,
            template_context: dict[str, Any] | None = None,
        ) -> AnyAgent[TDeps, TAgentResult]
        

        Create a copy of an agent.

        Parameters:

        Name Type Description Default
        agent AgentName | AnyAgent[TDeps, TAgentResult]

        Agent instance or name to clone

        required
        new_name AgentName | None

        Optional name for the clone

        None
        system_prompts list[str] | None

        Optional different prompts

        None
        template_context dict[str, Any] | None

        Variables for template rendering

        None

        Returns:

        Type Description
        AnyAgent[TDeps, TAgentResult]

        The new agent instance

        Source code in src/llmling_agent/delegation/pool.py
        558
        559
        560
        561
        562
        563
        564
        565
        566
        567
        568
        569
        570
        571
        572
        573
        574
        575
        576
        577
        578
        579
        580
        581
        582
        583
        584
        585
        586
        587
        588
        589
        590
        591
        592
        593
        594
        595
        596
        597
        598
        599
        600
        601
        602
        603
        604
        605
        606
        607
        608
        609
        610
        611
        612
        613
        614
        615
        616
        617
        async def clone_agent[TDeps, TAgentResult](
            self,
            agent: AgentName | AnyAgent[TDeps, TAgentResult],
            new_name: AgentName | None = None,
            *,
            system_prompts: list[str] | None = None,
            template_context: dict[str, Any] | None = None,
        ) -> AnyAgent[TDeps, TAgentResult]:
            """Create a copy of an agent.
        
            Args:
                agent: Agent instance or name to clone
                new_name: Optional name for the clone
                system_prompts: Optional different prompts
                template_context: Variables for template rendering
        
            Returns:
                The new agent instance
            """
            from llmling_agent.agent import Agent, StructuredAgent
        
            # Get original config
            if isinstance(agent, str):
                if agent not in self.manifest.agents:
                    msg = f"Agent {agent} not found"
                    raise KeyError(msg)
                config = self.manifest.agents[agent]
                original_agent: AnyAgent[Any, Any] = self.get_agent(agent)
            else:
                config = agent.context.config  # type: ignore
                original_agent = agent
        
            # Create new config
            new_config = config.model_copy(deep=True)
        
            # Apply overrides
            if system_prompts:
                new_config.system_prompts = system_prompts
        
            # Handle template rendering
            if template_context:
                new_config.system_prompts = new_config.render_system_prompts(template_context)
        
            # Create new agent with same runtime
            new_agent = Agent[TDeps](
                runtime=original_agent.runtime,
                context=original_agent.context,
                # result_type=original_agent.actual_type,
                provider=new_config.get_provider(),
                system_prompt=new_config.system_prompts,
                name=new_name or f"{config.name}_copy_{len(self.agents)}",
            )
            if isinstance(original_agent, StructuredAgent):
                new_agent = new_agent.to_structured(original_agent.actual_type)
        
            # Register in pool
            agent_name = new_agent.name
            self.manifest.agents[agent_name] = new_config
            self.agents[agent_name] = new_agent
            return await self.exit_stack.enter_async_context(new_agent)
        

        create_agent async

        create_agent(
            name: AgentName,
            *,
            session: SessionIdType | SessionQuery = None,
            name_override: str | None = None,
        ) -> Agent[TPoolDeps]
        
        create_agent(
            name: AgentName,
            *,
            deps: TCustomDeps,
            session: SessionIdType | SessionQuery = None,
            name_override: str | None = None,
        ) -> Agent[TCustomDeps]
        
        create_agent(
            name: AgentName,
            *,
            return_type: type[TResult],
            session: SessionIdType | SessionQuery = None,
            name_override: str | None = None,
        ) -> StructuredAgent[TPoolDeps, TResult]
        
        create_agent(
            name: AgentName,
            *,
            deps: TCustomDeps,
            return_type: type[TResult],
            session: SessionIdType | SessionQuery = None,
            name_override: str | None = None,
        ) -> StructuredAgent[TCustomDeps, TResult]
        
        create_agent(
            name: AgentName,
            *,
            deps: Any | None = None,
            return_type: Any | None = None,
            session: SessionIdType | SessionQuery = None,
            name_override: str | None = None,
        ) -> AnyAgent[Any, Any]
        

        Create a new agent instance from configuration.

        Parameters:

        Name Type Description Default
        name AgentName

        Name of the agent configuration to use

        required
        deps Any | None

        Optional custom dependencies (overrides pool deps)

        None
        return_type Any | None

        Optional type for structured responses

        None
        session SessionIdType | SessionQuery

        Optional session ID or query to recover conversation

        None
        name_override str | None

        Optional different name for this instance

        None

        Returns:

        Type Description
        AnyAgent[Any, Any]

        New agent instance with the specified configuration

        Raises:

        Type Description
        KeyError

        If agent configuration not found

        ValueError

        If configuration is invalid

        Source code in src/llmling_agent/delegation/pool.py
        659
        660
        661
        662
        663
        664
        665
        666
        667
        668
        669
        670
        671
        672
        673
        674
        675
        676
        677
        678
        679
        680
        681
        682
        683
        684
        685
        686
        687
        688
        689
        690
        691
        692
        693
        694
        695
        696
        697
        698
        699
        700
        701
        702
        703
        704
        705
        706
        707
        708
        709
        async def create_agent(
            self,
            name: AgentName,
            *,
            deps: Any | None = None,
            return_type: Any | None = None,
            session: SessionIdType | SessionQuery = None,
            name_override: str | None = None,
        ) -> AnyAgent[Any, Any]:
            """Create a new agent instance from configuration.
        
            Args:
                name: Name of the agent configuration to use
                deps: Optional custom dependencies (overrides pool deps)
                return_type: Optional type for structured responses
                session: Optional session ID or query to recover conversation
                name_override: Optional different name for this instance
        
            Returns:
                New agent instance with the specified configuration
        
            Raises:
                KeyError: If agent configuration not found
                ValueError: If configuration is invalid
            """
            if name not in self.manifest.agents:
                msg = f"Agent configuration {name!r} not found"
                raise KeyError(msg)
        
            # Use Manifest.get_agent for proper initialization
            final_deps = deps if deps is not None else self.shared_deps
            agent = self.manifest.get_agent(name, deps=final_deps)
            # Override name if requested
            if name_override:
                agent.name = name_override
        
            # Set pool reference
            agent.context.pool = self
        
            # Handle session if provided
            if session:
                agent.conversation.load_history_from_database(session=session)
        
            # Initialize agent through exit stack
            agent = await self.exit_stack.enter_async_context(agent)
        
            # Override structured configuration if provided
            if return_type is not None:
                return agent.to_structured(return_type)
        
            return agent
        

        create_team

        create_team(agents: Sequence[str]) -> Team[TPoolDeps]
        
        create_team(
            agents: Sequence[MessageNode[TDeps, Any]],
            *,
            name: str | None = None,
            description: str | None = None,
            shared_prompt: str | None = None,
            picker: AnyAgent[Any, Any] | None = None,
            num_picks: int | None = None,
            pick_prompt: str | None = None,
        ) -> Team[TDeps]
        
        create_team(
            agents: Sequence[AgentName | MessageNode[Any, Any]],
            *,
            name: str | None = None,
            description: str | None = None,
            shared_prompt: str | None = None,
            picker: AnyAgent[Any, Any] | None = None,
            num_picks: int | None = None,
            pick_prompt: str | None = None,
        ) -> Team[Any]
        
        create_team(
            agents: Sequence[AgentName | MessageNode[Any, Any]] | None = None,
            *,
            name: str | None = None,
            description: str | None = None,
            shared_prompt: str | None = None,
            picker: AnyAgent[Any, Any] | None = None,
            num_picks: int | None = None,
            pick_prompt: str | None = None,
        ) -> Team[Any]
        

        Create a group from agent names or instances.

        Parameters:

        Name Type Description Default
        agents Sequence[AgentName | MessageNode[Any, Any]] | None

        List of agent names or instances (all if None)

        None
        name str | None

        Optional name for the team

        None
        description str | None

        Optional description for the team

        None
        shared_prompt str | None

        Optional prompt for all agents

        None
        picker AnyAgent[Any, Any] | None

        Agent to use for picking agents

        None
        num_picks int | None

        Number of agents to pick

        None
        pick_prompt str | None

        Prompt to use for picking agents

        None
        Source code in src/llmling_agent/delegation/pool.py
        326
        327
        328
        329
        330
        331
        332
        333
        334
        335
        336
        337
        338
        339
        340
        341
        342
        343
        344
        345
        346
        347
        348
        349
        350
        351
        352
        353
        354
        355
        356
        357
        358
        359
        360
        361
        362
        363
        364
        365
        366
        367
        368
        369
        370
        371
        def create_team(
            self,
            agents: Sequence[AgentName | MessageNode[Any, Any]] | None = None,
            *,
            name: str | None = None,
            description: str | None = None,
            shared_prompt: str | None = None,
            picker: AnyAgent[Any, Any] | None = None,
            num_picks: int | None = None,
            pick_prompt: str | None = None,
        ) -> Team[Any]:
            """Create a group from agent names or instances.
        
            Args:
                agents: List of agent names or instances (all if None)
                name: Optional name for the team
                description: Optional description for the team
                shared_prompt: Optional prompt for all agents
                picker: Agent to use for picking agents
                num_picks: Number of agents to pick
                pick_prompt: Prompt to use for picking agents
            """
            from llmling_agent.delegation.team import Team
        
            if agents is None:
                agents = list(self.agents.keys())
        
            # First resolve/configure agents
            resolved_agents: list[MessageNode[Any, Any]] = []
            for agent in agents:
                if isinstance(agent, str):
                    agent = self.get_agent(agent)
                resolved_agents.append(agent)
        
            team = Team(
                name=name,
                description=description,
                agents=resolved_agents,
                shared_prompt=shared_prompt,
                picker=picker,
                num_picks=num_picks,
                pick_prompt=pick_prompt,
            )
            if name:
                self[name] = team
            return team
        

        create_team_run

        create_team_run(
            agents: Sequence[str],
            validator: MessageNode[Any, TResult] | None = None,
            *,
            name: str | None = None,
            description: str | None = None,
            shared_prompt: str | None = None,
            picker: AnyAgent[Any, Any] | None = None,
            num_picks: int | None = None,
            pick_prompt: str | None = None,
        ) -> TeamRun[TPoolDeps, TResult]
        
        create_team_run(
            agents: Sequence[MessageNode[TDeps, Any]],
            validator: MessageNode[Any, TResult] | None = None,
            *,
            name: str | None = None,
            description: str | None = None,
            shared_prompt: str | None = None,
            picker: AnyAgent[Any, Any] | None = None,
            num_picks: int | None = None,
            pick_prompt: str | None = None,
        ) -> TeamRun[TDeps, TResult]
        
        create_team_run(
            agents: Sequence[AgentName | MessageNode[Any, Any]],
            validator: MessageNode[Any, TResult] | None = None,
            *,
            name: str | None = None,
            description: str | None = None,
            shared_prompt: str | None = None,
            picker: AnyAgent[Any, Any] | None = None,
            num_picks: int | None = None,
            pick_prompt: str | None = None,
        ) -> TeamRun[Any, TResult]
        
        create_team_run(
            agents: Sequence[AgentName | MessageNode[Any, Any]] | None = None,
            validator: MessageNode[Any, TResult] | None = None,
            *,
            name: str | None = None,
            description: str | None = None,
            shared_prompt: str | None = None,
            picker: AnyAgent[Any, Any] | None = None,
            num_picks: int | None = None,
            pick_prompt: str | None = None,
        ) -> TeamRun[Any, TResult]
        

        Create a a sequential TeamRun from a list of Agents.

        Parameters:

        Name Type Description Default
        agents Sequence[AgentName | MessageNode[Any, Any]] | None

        List of agent names or team/agent instances (all if None)

        None
        validator MessageNode[Any, TResult] | None

        Node to validate the results of the TeamRun

        None
        name str | None

        Optional name for the team

        None
        description str | None

        Optional description for the team

        None
        shared_prompt str | None

        Optional prompt for all agents

        None
        picker AnyAgent[Any, Any] | None

        Agent to use for picking agents

        None
        num_picks int | None

        Number of agents to pick

        None
        pick_prompt str | None

        Prompt to use for picking agents

        None
        Source code in src/llmling_agent/delegation/pool.py
        248
        249
        250
        251
        252
        253
        254
        255
        256
        257
        258
        259
        260
        261
        262
        263
        264
        265
        266
        267
        268
        269
        270
        271
        272
        273
        274
        275
        276
        277
        278
        279
        280
        281
        282
        283
        284
        285
        286
        287
        288
        289
        290
        291
        292
        293
        294
        295
        def create_team_run(
            self,
            agents: Sequence[AgentName | MessageNode[Any, Any]] | None = None,
            validator: MessageNode[Any, TResult] | None = None,
            *,
            name: str | None = None,
            description: str | None = None,
            shared_prompt: str | None = None,
            picker: AnyAgent[Any, Any] | None = None,
            num_picks: int | None = None,
            pick_prompt: str | None = None,
        ) -> TeamRun[Any, TResult]:
            """Create a a sequential TeamRun from a list of Agents.
        
            Args:
                agents: List of agent names or team/agent instances (all if None)
                validator: Node to validate the results of the TeamRun
                name: Optional name for the team
                description: Optional description for the team
                shared_prompt: Optional prompt for all agents
                picker: Agent to use for picking agents
                num_picks: Number of agents to pick
                pick_prompt: Prompt to use for picking agents
            """
            from llmling_agent.delegation.teamrun import TeamRun
        
            if agents is None:
                agents = list(self.agents.keys())
        
            # First resolve/configure agents
            resolved_agents: list[MessageNode[Any, Any]] = []
            for agent in agents:
                if isinstance(agent, str):
                    agent = self.get_agent(agent)
                resolved_agents.append(agent)
            team = TeamRun(
                resolved_agents,
                name=name,
                description=description,
                validator=validator,
                shared_prompt=shared_prompt,
                picker=picker,
                num_picks=num_picks,
                pick_prompt=pick_prompt,
            )
            if name:
                self[name] = team
            return team
        

        get_agent

        get_agent(
            agent: AgentName | Agent[Any],
            *,
            model_override: str | None = None,
            session: SessionIdType | SessionQuery = None,
        ) -> Agent[TPoolDeps]
        
        get_agent(
            agent: AgentName | Agent[Any],
            *,
            return_type: type[TResult],
            model_override: str | None = None,
            session: SessionIdType | SessionQuery = None,
        ) -> StructuredAgent[TPoolDeps, TResult]
        
        get_agent(
            agent: AgentName | Agent[Any],
            *,
            deps: TCustomDeps,
            model_override: str | None = None,
            session: SessionIdType | SessionQuery = None,
        ) -> Agent[TCustomDeps]
        
        get_agent(
            agent: AgentName | Agent[Any],
            *,
            deps: TCustomDeps,
            return_type: type[TResult],
            model_override: str | None = None,
            session: SessionIdType | SessionQuery = None,
        ) -> StructuredAgent[TCustomDeps, TResult]
        
        get_agent(
            agent: AgentName | Agent[Any],
            *,
            deps: Any | None = None,
            return_type: Any | None = None,
            model_override: str | None = None,
            session: SessionIdType | SessionQuery = None,
        ) -> AnyAgent[Any, Any]
        

        Get or configure an agent from the pool.

        This method provides flexible agent configuration with dependency injection: - Without deps: Agent uses pool's shared dependencies - With deps: Agent uses provided custom dependencies - With return_type: Returns a StructuredAgent with type validation

        Parameters:

        Name Type Description Default
        agent AgentName | Agent[Any]

        Either agent name or instance

        required
        deps Any | None

        Optional custom dependencies (overrides shared deps)

        None
        return_type Any | None

        Optional type for structured responses

        None
        model_override str | None

        Optional model override

        None
        session SessionIdType | SessionQuery

        Optional session ID or query to recover conversation

        None

        Returns:

        Name Type Description
        Either AnyAgent[Any, Any]
        AnyAgent[Any, Any]
        • Agent[TPoolDeps] when using pool's shared deps
        AnyAgent[Any, Any]
        • Agent[TCustomDeps] when custom deps provided
        AnyAgent[Any, Any]
        • StructuredAgent when return_type provided

        Raises:

        Type Description
        KeyError

        If agent name not found

        ValueError

        If configuration is invalid

        Source code in src/llmling_agent/delegation/pool.py
        767
        768
        769
        770
        771
        772
        773
        774
        775
        776
        777
        778
        779
        780
        781
        782
        783
        784
        785
        786
        787
        788
        789
        790
        791
        792
        793
        794
        795
        796
        797
        798
        799
        800
        801
        802
        803
        804
        805
        806
        807
        808
        809
        810
        811
        812
        813
        814
        815
        816
        817
        818
        819
        820
        821
        822
        823
        824
        825
        def get_agent(
            self,
            agent: AgentName | Agent[Any],
            *,
            deps: Any | None = None,
            return_type: Any | None = None,
            model_override: str | None = None,
            session: SessionIdType | SessionQuery = None,
        ) -> AnyAgent[Any, Any]:
            """Get or configure an agent from the pool.
        
            This method provides flexible agent configuration with dependency injection:
            - Without deps: Agent uses pool's shared dependencies
            - With deps: Agent uses provided custom dependencies
            - With return_type: Returns a StructuredAgent with type validation
        
            Args:
                agent: Either agent name or instance
                deps: Optional custom dependencies (overrides shared deps)
                return_type: Optional type for structured responses
                model_override: Optional model override
                session: Optional session ID or query to recover conversation
        
            Returns:
                Either:
                - Agent[TPoolDeps] when using pool's shared deps
                - Agent[TCustomDeps] when custom deps provided
                - StructuredAgent when return_type provided
        
            Raises:
                KeyError: If agent name not found
                ValueError: If configuration is invalid
            """
            from llmling_agent.agent import Agent
            from llmling_agent.agent.context import AgentContext
        
            # Get base agent
            base = agent if isinstance(agent, Agent) else self.agents[agent]
        
            # Setup context and dependencies
            if base.context is None:
                base.context = AgentContext[Any].create_default(base.name)
        
            # Use custom deps if provided, otherwise use shared deps
            base.context.data = deps if deps is not None else self.shared_deps
            base.context.pool = self
        
            # Apply overrides
            if model_override:
                base.set_model(model_override)
        
            if session:
                base.conversation.load_history_from_database(session=session)
        
            # Convert to structured if needed
            if return_type is not None:
                return base.to_structured(return_type)
        
            return base
        

        get_mermaid_diagram

        get_mermaid_diagram(include_details: bool = True) -> str
        

        Generate mermaid flowchart of all agents and their connections.

        Parameters:

        Name Type Description Default
        include_details bool

        Whether to show connection details (types, queues, etc)

        True
        Source code in src/llmling_agent/delegation/pool.py
        887
        888
        889
        890
        891
        892
        893
        894
        895
        896
        897
        898
        899
        900
        901
        902
        903
        904
        905
        906
        907
        908
        909
        910
        911
        912
        913
        914
        915
        916
        917
        918
        919
        920
        921
        922
        923
        924
        925
        926
        def get_mermaid_diagram(
            self,
            include_details: bool = True,
        ) -> str:
            """Generate mermaid flowchart of all agents and their connections.
        
            Args:
                include_details: Whether to show connection details (types, queues, etc)
            """
            lines = ["flowchart LR"]
        
            # Add all agents as nodes
            for name in self.agents:
                lines.append(f"    {name}[{name}]")  # noqa: PERF401
        
            # Add all connections as edges
            for agent in self.agents.values():
                connections = agent.connections.get_connections()
                for talk in connections:
                    talk = cast(Talk[Any], talk)  # help mypy understand it's a Talk
                    source = talk.source.name
                    for target in talk.targets:
                        if include_details:
                            details: list[str] = []
                            details.append(talk.connection_type)
                            if talk.queued:
                                details.append(f"queued({talk.queue_strategy})")
                            if fn := talk.filter_condition:  # type: ignore
                                details.append(f"filter:{fn.__name__}")
                            if fn := talk.stop_condition:  # type: ignore
                                details.append(f"stop:{fn.__name__}")
                            if fn := talk.exit_condition:  # type: ignore
                                details.append(f"exit:{fn.__name__}")
        
                            label = f"|{' '.join(details)}|" if details else ""
                            lines.append(f"    {source}--{label}-->{target.name}")
                        else:
                            lines.append(f"    {source}-->{target.name}")
        
            return "\n".join(lines)
        

        list_nodes

        list_nodes() -> list[str]
        

        List available agent names.

        Source code in src/llmling_agent/delegation/pool.py
        827
        828
        829
        def list_nodes(self) -> list[str]:
            """List available agent names."""
            return list(self.list_items())
        

        run_event_loop async

        run_event_loop()
        

        Run pool in event-watching mode until interrupted.

        Source code in src/llmling_agent/delegation/pool.py
        383
        384
        385
        386
        387
        388
        389
        390
        391
        392
        393
        394
        395
        396
        397
        398
        399
        400
        401
        402
        403
        404
        async def run_event_loop(self):
            """Run pool in event-watching mode until interrupted."""
            import sys
        
            print("Starting event watch mode...")
            print("Active nodes: ", ", ".join(self.list_nodes()))
            print("Press Ctrl+C to stop")
        
            stop_event = asyncio.Event()
        
            if sys.platform != "win32":
                # Unix: Use signal handlers
                loop = asyncio.get_running_loop()
                for sig in (signal.SIGINT, signal.SIGTERM):
                    loop.add_signal_handler(sig, stop_event.set)
                while True:
                    await asyncio.sleep(1)
            else:
                # Windows: Use keyboard interrupt
                with suppress(KeyboardInterrupt):
                    while True:
                        await asyncio.sleep(1)
        

        setup_agent_workers

        setup_agent_workers(agent: AnyAgent[Any, Any])
        

        Set up workers for an agent from configuration.

        Source code in src/llmling_agent/delegation/pool.py
        711
        712
        713
        714
        715
        716
        717
        718
        719
        720
        721
        722
        723
        724
        725
        def setup_agent_workers(self, agent: AnyAgent[Any, Any]):
            """Set up workers for an agent from configuration."""
            for worker_config in agent.context.config.workers:
                try:
                    worker = self.get_agent(worker_config.name)
                    agent.register_worker(
                        worker,
                        name=worker_config.name,
                        reset_history_on_run=worker_config.reset_history_on_run,
                        pass_message_history=worker_config.pass_message_history,
                        share_context=worker_config.share_context,
                    )
                except KeyError as e:
                    msg = f"Worker agent {worker_config.name!r} not found"
                    raise ValueError(msg) from e
        

        track_message_flow async

        track_message_flow() -> AsyncIterator[MessageFlowTracker]
        

        Track message flow during a context.

        Source code in src/llmling_agent/delegation/pool.py
        373
        374
        375
        376
        377
        378
        379
        380
        381
        @asynccontextmanager
        async def track_message_flow(self) -> AsyncIterator[MessageFlowTracker]:
            """Track message flow during a context."""
            tracker = MessageFlowTracker()
            self.connection_registry.message_flow.connect(tracker.track)
            try:
                yield tracker
            finally:
                self.connection_registry.message_flow.disconnect(tracker.track)
        

        BaseTeam

        Bases: MessageNode[TDeps, TResult]

        Base class for Team and TeamRun.

        Source code in src/llmling_agent/delegation/base_team.py
         71
         72
         73
         74
         75
         76
         77
         78
         79
         80
         81
         82
         83
         84
         85
         86
         87
         88
         89
         90
         91
         92
         93
         94
         95
         96
         97
         98
         99
        100
        101
        102
        103
        104
        105
        106
        107
        108
        109
        110
        111
        112
        113
        114
        115
        116
        117
        118
        119
        120
        121
        122
        123
        124
        125
        126
        127
        128
        129
        130
        131
        132
        133
        134
        135
        136
        137
        138
        139
        140
        141
        142
        143
        144
        145
        146
        147
        148
        149
        150
        151
        152
        153
        154
        155
        156
        157
        158
        159
        160
        161
        162
        163
        164
        165
        166
        167
        168
        169
        170
        171
        172
        173
        174
        175
        176
        177
        178
        179
        180
        181
        182
        183
        184
        185
        186
        187
        188
        189
        190
        191
        192
        193
        194
        195
        196
        197
        198
        199
        200
        201
        202
        203
        204
        205
        206
        207
        208
        209
        210
        211
        212
        213
        214
        215
        216
        217
        218
        219
        220
        221
        222
        223
        224
        225
        226
        227
        228
        229
        230
        231
        232
        233
        234
        235
        236
        237
        238
        239
        240
        241
        242
        243
        244
        245
        246
        247
        248
        249
        250
        251
        252
        253
        254
        255
        256
        257
        258
        259
        260
        261
        262
        263
        264
        265
        266
        267
        268
        269
        270
        271
        272
        273
        274
        275
        276
        277
        278
        279
        280
        281
        282
        283
        284
        285
        286
        287
        288
        289
        290
        291
        292
        293
        294
        295
        296
        297
        298
        299
        300
        301
        302
        303
        304
        305
        306
        307
        308
        309
        310
        311
        312
        313
        314
        315
        316
        317
        318
        319
        320
        321
        322
        323
        324
        325
        326
        327
        328
        329
        330
        331
        332
        333
        334
        335
        336
        337
        338
        339
        340
        341
        342
        343
        344
        345
        346
        347
        348
        349
        350
        351
        352
        353
        354
        355
        356
        357
        358
        359
        360
        361
        362
        363
        364
        365
        366
        367
        368
        369
        370
        371
        372
        373
        374
        375
        376
        377
        378
        379
        380
        381
        382
        383
        384
        385
        386
        387
        388
        389
        390
        391
        392
        393
        394
        395
        396
        397
        398
        399
        400
        401
        402
        403
        404
        405
        406
        407
        408
        409
        410
        411
        412
        413
        414
        415
        416
        417
        418
        419
        420
        421
        422
        423
        424
        425
        426
        427
        428
        429
        430
        431
        432
        433
        434
        435
        436
        437
        438
        439
        440
        441
        442
        443
        444
        445
        446
        447
        448
        449
        450
        451
        452
        453
        454
        455
        456
        457
        458
        459
        460
        461
        462
        463
        464
        465
        466
        467
        468
        469
        470
        471
        472
        473
        474
        475
        476
        477
        478
        479
        480
        481
        482
        483
        484
        485
        486
        487
        488
        489
        490
        491
        492
        493
        494
        495
        496
        497
        498
        499
        500
        501
        502
        503
        504
        505
        506
        507
        508
        509
        510
        511
        512
        513
        514
        515
        516
        class BaseTeam[TDeps, TResult](MessageNode[TDeps, TResult]):
            """Base class for Team and TeamRun."""
        
            def __init__(
                self,
                agents: Sequence[MessageNode[TDeps, TResult]],
                *,
                name: str | None = None,
                description: str | None = None,
                shared_prompt: str | None = None,
                mcp_servers: list[str | MCPServerConfig] | None = None,
                picker: AnyAgent[Any, Any] | None = None,
                num_picks: int | None = None,
                pick_prompt: str | None = None,
            ):
                """Common variables only for typing."""
                from llmling_agent.delegation.teamrun import ExtendedTeamTalk
        
                self._name = name or " & ".join([i.name for i in agents])
                self.agents = EventedList[MessageNode]()
                self.agents.events.inserted.connect(self._on_node_added)
                self.agents.events.removed.connect(self._on_node_removed)
                self.agents.events.changed.connect(self._on_node_changed)
                super().__init__(
                    name=self._name,
                    context=self.context,
                    mcp_servers=mcp_servers,
                    description=description,
                )
                self.agents.extend(list(agents))
                self._team_talk = ExtendedTeamTalk()
                self.shared_prompt = shared_prompt
                self._main_task: asyncio.Task[Any] | None = None
                self._infinite = False
                self.picker = picker
                self.num_picks = num_picks
                self.pick_prompt = pick_prompt
        
            async def pick_agents(self, task: str) -> Sequence[MessageNode[Any, Any]]:
                """Pick agents to run."""
                if self.picker:
                    if self.num_picks == 1:
                        result = await self.picker.talk.pick(self, task, self.pick_prompt)
                        return [result.selection]
                    result = await self.picker.talk.pick_multiple(
                        self,
                        task,
                        min_picks=self.num_picks or 1,
                        max_picks=self.num_picks,
                        prompt=self.pick_prompt,
                    )
                    return result.selections
                return list(self.agents)
        
            def _on_node_changed(self, index: int, old: MessageNode, new: MessageNode):
                """Handle node replacement in the agents list."""
                self._on_node_removed(index, old)
                self._on_node_added(index, new)
        
            def _on_node_added(self, index: int, node: MessageNode[Any, Any]):
                """Handler for adding nodes to the team."""
                from llmling_agent.agent import Agent, StructuredAgent
        
                if isinstance(node, Agent | StructuredAgent):
                    node.tools.add_provider(self.mcp)
                # TODO: Right now connecting here is not desired since emission means db logging
                # ideally db logging would not rely on the "public" agent signal.
        
                # node.tool_used.connect(self.tool_used)
        
            def _on_node_removed(self, index: int, node: MessageNode[Any, Any]):
                """Handler for removing nodes from the team."""
                from llmling_agent.agent import Agent, StructuredAgent
        
                if isinstance(node, Agent | StructuredAgent):
                    node.tools.remove_provider(self.mcp)
                # node.tool_used.disconnect(self.tool_used)
        
            def __repr__(self) -> str:
                """Create readable representation."""
                members = ", ".join(agent.name for agent in self.agents)
                name = f" ({self.name})" if self.name else ""
                return f"{self.__class__.__name__}[{len(self.agents)}]{name}: {members}"
        
            def __len__(self) -> int:
                """Get number of team members."""
                return len(self.agents)
        
            def __iter__(self) -> Iterator[MessageNode[TDeps, TResult]]:
                """Iterate over team members."""
                return iter(self.agents)
        
            def __getitem__(self, index_or_name: int | str) -> MessageNode[TDeps, TResult]:
                """Get team member by index or name."""
                if isinstance(index_or_name, str):
                    return next(agent for agent in self.agents if agent.name == index_or_name)
                return self.agents[index_or_name]
        
            def __or__(
                self,
                other: AnyAgent[Any, Any] | ProcessorCallback[Any] | BaseTeam[Any, Any],
            ) -> TeamRun[Any, Any]:
                """Create a sequential pipeline."""
                from llmling_agent.agent import Agent, StructuredAgent
                from llmling_agent.delegation.teamrun import TeamRun
        
                # Handle conversion of callables first
                if callable(other):
                    if has_return_type(other, str):
                        other = Agent.from_callback(other)
                    else:
                        other = StructuredAgent.from_callback(other)
                    other.context.pool = self.context.pool
        
                # If we're already a TeamRun, extend it
                if isinstance(self, TeamRun):
                    if self.validator:
                        # If we have a validator, create new TeamRun to preserve validation
                        return TeamRun([self, other])
                    self.agents.append(other)
                    return self
                # Otherwise create new TeamRun
                return TeamRun([self, other])
        
            @overload
            def __and__(self, other: Team[None]) -> Team[None]: ...
        
            @overload
            def __and__(self, other: Team[TDeps]) -> Team[TDeps]: ...
        
            @overload
            def __and__(self, other: Team[Any]) -> Team[Any]: ...
        
            @overload
            def __and__(self, other: AnyAgent[TDeps, Any]) -> Team[TDeps]: ...
        
            @overload
            def __and__(self, other: AnyAgent[Any, Any]) -> Team[Any]: ...
        
            def __and__(
                self, other: Team[Any] | AnyAgent[Any, Any] | ProcessorCallback[Any]
            ) -> Team[Any]:
                """Combine teams, preserving type safety for same types."""
                from llmling_agent.agent import Agent, StructuredAgent
                from llmling_agent.delegation.team import Team
        
                if callable(other):
                    if has_return_type(other, str):
                        other = Agent.from_callback(other)
                    else:
                        other = StructuredAgent.from_callback(other)
                    other.context.pool = self.context.pool
        
                match other:
                    case Team():
                        # Flatten when combining Teams
                        return Team([*self.agents, *other.agents])
                    case _:
                        # Everything else just becomes a member
                        return Team([*self.agents, other])
        
            @property
            def stats(self) -> AggregatedMessageStats:
                """Get aggregated stats from all team members."""
                return AggregatedMessageStats(stats=[agent.stats for agent in self.agents])
        
            @property
            def is_running(self) -> bool:
                """Whether execution is currently running."""
                return bool(self._main_task and not self._main_task.done())
        
            def is_busy(self) -> bool:
                """Check if team is processing any tasks."""
                return bool(self._pending_tasks or self._main_task)
        
            async def stop(self):
                """Stop background execution if running."""
                if self._main_task and not self._main_task.done():
                    self._main_task.cancel()
                    await self._main_task
                self._main_task = None
                await self.cleanup_tasks()
        
            async def wait(self) -> ChatMessage[Any] | None:
                """Wait for background execution to complete and return last message."""
                if not self._main_task:
                    msg = "No execution running"
                    raise RuntimeError(msg)
                if self._infinite:
                    msg = "Cannot wait on infinite execution"
                    raise RuntimeError(msg)
                try:
                    return await self._main_task
                finally:
                    await self.cleanup_tasks()
                    self._main_task = None
        
            async def run_in_background(
                self,
                *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
                max_count: int | None = 1,  # 1 = single execution, None = indefinite
                interval: float = 1.0,
                **kwargs: Any,
            ) -> ExtendedTeamTalk:
                """Start execution in background.
        
                Args:
                    prompts: Prompts to execute
                    max_count: Maximum number of executions (None = run indefinitely)
                    interval: Seconds between executions
                    **kwargs: Additional args for execute()
                """
                if self._main_task:
                    msg = "Execution already running"
                    raise RuntimeError(msg)
                self._infinite = max_count is None
        
                async def _continuous() -> ChatMessage[Any] | None:
                    count = 0
                    last_message = None
                    while max_count is None or count < max_count:
                        try:
                            result = await self.execute(*prompts, **kwargs)
                            last_message = result[-1].message if result else None
                            count += 1
                            if max_count is None or count < max_count:
                                await asyncio.sleep(interval)
                        except asyncio.CancelledError:
                            logger.debug("Background execution cancelled")
                            break
                    return last_message
        
                self._main_task = self.create_task(_continuous(), name="main_execution")
                return self._team_talk
        
            @property
            def execution_stats(self) -> AggregatedTalkStats:
                """Get current execution statistics."""
                return self._team_talk.stats
        
            @property
            def talk(self) -> ExtendedTeamTalk:
                """Get current connection."""
                return self._team_talk
        
            @property
            def events(self) -> ListEvents:
                """Get events for the team."""
                return self.agents.events
        
            async def cancel(self):
                """Cancel execution and cleanup."""
                if self._main_task:
                    self._main_task.cancel()
                await self.cleanup_tasks()
        
            def get_structure_diagram(self) -> str:
                """Generate mermaid flowchart of node hierarchy."""
                lines = ["flowchart TD"]
        
                def add_node(node: MessageNode[Any, Any], parent: str | None = None):
                    """Recursively add node and its members to diagram."""
                    node_id = f"node_{id(node)}"
                    lines.append(f"    {node_id}[{node.name}]")
                    if parent:
                        lines.append(f"    {parent} --> {node_id}")
        
                    # If it's a team, recursively add its members
                    from llmling_agent.delegation.base_team import BaseTeam
        
                    if isinstance(node, BaseTeam):
                        for member in node.agents:
                            add_node(member, node_id)
        
                # Start with root nodes (team members)
                for node in self.agents:
                    add_node(node)
        
                return "\n".join(lines)
        
            def iter_agents(self) -> Iterator[AnyAgent[Any, Any]]:
                """Recursively iterate over all child agents."""
                from llmling_agent.agent import Agent, StructuredAgent
        
                for node in self.agents:
                    match node:
                        case BaseTeam():
                            yield from node.iter_agents()
                        case Agent() | StructuredAgent():
                            yield node
                        case _:
                            msg = f"Invalid node type: {type(node)}"
                            raise ValueError(msg)
        
            @property
            def context(self) -> TeamContext:
                """Get shared pool from team members.
        
                Raises:
                    ValueError: If team members belong to different pools
                """
                from llmling_agent.delegation.team import Team
        
                pool_ids: set[int] = set()
                shared_pool: AgentPool | None = None
                team_config: TeamConfig | None = None
        
                for agent in self.iter_agents():
                    if agent.context and agent.context.pool:
                        pool_id = id(agent.context.pool)
                        if pool_id not in pool_ids:
                            pool_ids.add(pool_id)
                            shared_pool = agent.context.pool
                            if shared_pool.manifest.teams:
                                team_config = shared_pool.manifest.teams.get(self.name)
                if not team_config:
                    mode = "parallel" if isinstance(self, Team) else "sequential"
                    team_config = TeamConfig(name=self.name, mode=mode, members=[])
                if not pool_ids:
                    logger.info("No pool found for team %s.", self.name)
                    return TeamContext(
                        node_name=self.name,
                        pool=shared_pool,
                        config=team_config,
                        definition=shared_pool.manifest if shared_pool else AgentsManifest(),
                    )
        
                if len(pool_ids) > 1:
                    msg = f"Team members in {self.name} belong to different pools"
                    raise ValueError(msg)
                return TeamContext(
                    node_name=self.name,
                    pool=shared_pool,
                    config=team_config,
                    definition=shared_pool.manifest if shared_pool else AgentsManifest(),
                )
        
            @context.setter
            def context(self, value: NodeContext):
                msg = "Cannot set context on BaseTeam"
                raise RuntimeError(msg)
        
            async def distribute(
                self,
                content: str,
                *,
                tools: list[str] | None = None,
                resources: list[str] | None = None,
                metadata: dict[str, Any] | None = None,
            ):
                """Distribute content and capabilities to all team members."""
                for agent in self.iter_agents():
                    # Add context message
                    agent.conversation.add_context_message(
                        content, source="distribution", metadata=metadata
                    )
        
                    # Register tools if provided
                    if tools:
                        for tool in tools:
                            agent.tools.register_tool(tool)
        
                    # Load resources if provided
                    if resources:
                        for resource in resources:
                            await agent.conversation.load_context_source(resource)
        
            @asynccontextmanager
            async def temporary_state(
                self,
                *,
                system_prompts: list[AnyPromptType] | None = None,
                replace_prompts: bool = False,
                tools: list[ToolType] | None = None,
                replace_tools: bool = False,
                history: list[AnyPromptType] | SessionQuery | None = None,
                replace_history: bool = False,
                pause_routing: bool = False,
                model: ModelType | None = None,
                provider: AgentProvider | None = None,
            ) -> AsyncIterator[Self]:
                """Temporarily modify state of all agents in the team.
        
                All agents in the team will enter their temporary state simultaneously.
        
                Args:
                    system_prompts: Temporary system prompts to use
                    replace_prompts: Whether to replace existing prompts
                    tools: Temporary tools to make available
                    replace_tools: Whether to replace existing tools
                    history: Conversation history (prompts or query)
                    replace_history: Whether to replace existing history
                    pause_routing: Whether to pause message routing
                    model: Temporary model override
                    provider: Temporary provider override
                """
                # Get all agents (flattened) before entering context
                agents = list(self.iter_agents())
        
                async with AsyncExitStack() as stack:
                    if pause_routing:
                        await stack.enter_async_context(self.connections.paused_routing())
                    # Enter temporary state for all agents
                    for agent in agents:
                        await stack.enter_async_context(
                            agent.temporary_state(
                                system_prompts=system_prompts,
                                replace_prompts=replace_prompts,
                                tools=tools,
                                replace_tools=replace_tools,
                                history=history,
                                replace_history=replace_history,
                                pause_routing=pause_routing,
                                model=model,
                                provider=provider,
                            )
                        )
                    try:
                        yield self
                    finally:
                        # AsyncExitStack will handle cleanup of all states
                        pass
        
            @abstractmethod
            async def execute(
                self,
                *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
                **kwargs: Any,
            ) -> TeamResponse: ...
        
            def run_sync(
                self,
                *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str],
                store_history: bool = True,
            ) -> ChatMessage[TResult]:
                """Run agent synchronously (convenience wrapper).
        
                Args:
                    prompt: User query or instruction
                    store_history: Whether the message exchange should be added to the
                                   context window
                Returns:
                    Result containing response and run information
                """
                coro = self.run(*prompt, store_history=store_history)
                return self.run_task_sync(coro)
        

        context property writable

        context: TeamContext
        

        Get shared pool from team members.

        Raises:

        Type Description
        ValueError

        If team members belong to different pools

        events property

        events: ListEvents
        

        Get events for the team.

        execution_stats property

        execution_stats: AggregatedTalkStats
        

        Get current execution statistics.

        is_running property

        is_running: bool
        

        Whether execution is currently running.

        stats property

        Get aggregated stats from all team members.

        talk property

        Get current connection.

        __and__

        __and__(other: Team[None]) -> Team[None]
        
        __and__(other: Team[TDeps]) -> Team[TDeps]
        
        __and__(other: Team[Any]) -> Team[Any]
        
        __and__(other: AnyAgent[TDeps, Any]) -> Team[TDeps]
        
        __and__(other: AnyAgent[Any, Any]) -> Team[Any]
        
        __and__(other: Team[Any] | AnyAgent[Any, Any] | ProcessorCallback[Any]) -> Team[Any]
        

        Combine teams, preserving type safety for same types.

        Source code in src/llmling_agent/delegation/base_team.py
        210
        211
        212
        213
        214
        215
        216
        217
        218
        219
        220
        221
        222
        223
        224
        225
        226
        227
        228
        229
        230
        def __and__(
            self, other: Team[Any] | AnyAgent[Any, Any] | ProcessorCallback[Any]
        ) -> Team[Any]:
            """Combine teams, preserving type safety for same types."""
            from llmling_agent.agent import Agent, StructuredAgent
            from llmling_agent.delegation.team import Team
        
            if callable(other):
                if has_return_type(other, str):
                    other = Agent.from_callback(other)
                else:
                    other = StructuredAgent.from_callback(other)
                other.context.pool = self.context.pool
        
            match other:
                case Team():
                    # Flatten when combining Teams
                    return Team([*self.agents, *other.agents])
                case _:
                    # Everything else just becomes a member
                    return Team([*self.agents, other])
        

        __getitem__

        __getitem__(index_or_name: int | str) -> MessageNode[TDeps, TResult]
        

        Get team member by index or name.

        Source code in src/llmling_agent/delegation/base_team.py
        163
        164
        165
        166
        167
        def __getitem__(self, index_or_name: int | str) -> MessageNode[TDeps, TResult]:
            """Get team member by index or name."""
            if isinstance(index_or_name, str):
                return next(agent for agent in self.agents if agent.name == index_or_name)
            return self.agents[index_or_name]
        

        __init__

        __init__(
            agents: Sequence[MessageNode[TDeps, TResult]],
            *,
            name: str | None = None,
            description: str | None = None,
            shared_prompt: str | None = None,
            mcp_servers: list[str | MCPServerConfig] | None = None,
            picker: AnyAgent[Any, Any] | None = None,
            num_picks: int | None = None,
            pick_prompt: str | None = None,
        )
        

        Common variables only for typing.

        Source code in src/llmling_agent/delegation/base_team.py
         74
         75
         76
         77
         78
         79
         80
         81
         82
         83
         84
         85
         86
         87
         88
         89
         90
         91
         92
         93
         94
         95
         96
         97
         98
         99
        100
        101
        102
        103
        104
        105
        106
        107
        def __init__(
            self,
            agents: Sequence[MessageNode[TDeps, TResult]],
            *,
            name: str | None = None,
            description: str | None = None,
            shared_prompt: str | None = None,
            mcp_servers: list[str | MCPServerConfig] | None = None,
            picker: AnyAgent[Any, Any] | None = None,
            num_picks: int | None = None,
            pick_prompt: str | None = None,
        ):
            """Common variables only for typing."""
            from llmling_agent.delegation.teamrun import ExtendedTeamTalk
        
            self._name = name or " & ".join([i.name for i in agents])
            self.agents = EventedList[MessageNode]()
            self.agents.events.inserted.connect(self._on_node_added)
            self.agents.events.removed.connect(self._on_node_removed)
            self.agents.events.changed.connect(self._on_node_changed)
            super().__init__(
                name=self._name,
                context=self.context,
                mcp_servers=mcp_servers,
                description=description,
            )
            self.agents.extend(list(agents))
            self._team_talk = ExtendedTeamTalk()
            self.shared_prompt = shared_prompt
            self._main_task: asyncio.Task[Any] | None = None
            self._infinite = False
            self.picker = picker
            self.num_picks = num_picks
            self.pick_prompt = pick_prompt
        

        __iter__

        __iter__() -> Iterator[MessageNode[TDeps, TResult]]
        

        Iterate over team members.

        Source code in src/llmling_agent/delegation/base_team.py
        159
        160
        161
        def __iter__(self) -> Iterator[MessageNode[TDeps, TResult]]:
            """Iterate over team members."""
            return iter(self.agents)
        

        __len__

        __len__() -> int
        

        Get number of team members.

        Source code in src/llmling_agent/delegation/base_team.py
        155
        156
        157
        def __len__(self) -> int:
            """Get number of team members."""
            return len(self.agents)
        

        __or__

        __or__(
            other: AnyAgent[Any, Any] | ProcessorCallback[Any] | BaseTeam[Any, Any],
        ) -> TeamRun[Any, Any]
        

        Create a sequential pipeline.

        Source code in src/llmling_agent/delegation/base_team.py
        169
        170
        171
        172
        173
        174
        175
        176
        177
        178
        179
        180
        181
        182
        183
        184
        185
        186
        187
        188
        189
        190
        191
        192
        193
        def __or__(
            self,
            other: AnyAgent[Any, Any] | ProcessorCallback[Any] | BaseTeam[Any, Any],
        ) -> TeamRun[Any, Any]:
            """Create a sequential pipeline."""
            from llmling_agent.agent import Agent, StructuredAgent
            from llmling_agent.delegation.teamrun import TeamRun
        
            # Handle conversion of callables first
            if callable(other):
                if has_return_type(other, str):
                    other = Agent.from_callback(other)
                else:
                    other = StructuredAgent.from_callback(other)
                other.context.pool = self.context.pool
        
            # If we're already a TeamRun, extend it
            if isinstance(self, TeamRun):
                if self.validator:
                    # If we have a validator, create new TeamRun to preserve validation
                    return TeamRun([self, other])
                self.agents.append(other)
                return self
            # Otherwise create new TeamRun
            return TeamRun([self, other])
        

        __repr__

        __repr__() -> str
        

        Create readable representation.

        Source code in src/llmling_agent/delegation/base_team.py
        149
        150
        151
        152
        153
        def __repr__(self) -> str:
            """Create readable representation."""
            members = ", ".join(agent.name for agent in self.agents)
            name = f" ({self.name})" if self.name else ""
            return f"{self.__class__.__name__}[{len(self.agents)}]{name}: {members}"
        

        _on_node_added

        _on_node_added(index: int, node: MessageNode[Any, Any])
        

        Handler for adding nodes to the team.

        Source code in src/llmling_agent/delegation/base_team.py
        130
        131
        132
        133
        134
        135
        def _on_node_added(self, index: int, node: MessageNode[Any, Any]):
            """Handler for adding nodes to the team."""
            from llmling_agent.agent import Agent, StructuredAgent
        
            if isinstance(node, Agent | StructuredAgent):
                node.tools.add_provider(self.mcp)
        

        _on_node_changed

        _on_node_changed(index: int, old: MessageNode, new: MessageNode)
        

        Handle node replacement in the agents list.

        Source code in src/llmling_agent/delegation/base_team.py
        125
        126
        127
        128
        def _on_node_changed(self, index: int, old: MessageNode, new: MessageNode):
            """Handle node replacement in the agents list."""
            self._on_node_removed(index, old)
            self._on_node_added(index, new)
        

        _on_node_removed

        _on_node_removed(index: int, node: MessageNode[Any, Any])
        

        Handler for removing nodes from the team.

        Source code in src/llmling_agent/delegation/base_team.py
        141
        142
        143
        144
        145
        146
        def _on_node_removed(self, index: int, node: MessageNode[Any, Any]):
            """Handler for removing nodes from the team."""
            from llmling_agent.agent import Agent, StructuredAgent
        
            if isinstance(node, Agent | StructuredAgent):
                node.tools.remove_provider(self.mcp)
        

        cancel async

        cancel()
        

        Cancel execution and cleanup.

        Source code in src/llmling_agent/delegation/base_team.py
        321
        322
        323
        324
        325
        async def cancel(self):
            """Cancel execution and cleanup."""
            if self._main_task:
                self._main_task.cancel()
            await self.cleanup_tasks()
        

        distribute async

        distribute(
            content: str,
            *,
            tools: list[str] | None = None,
            resources: list[str] | None = None,
            metadata: dict[str, Any] | None = None,
        )
        

        Distribute content and capabilities to all team members.

        Source code in src/llmling_agent/delegation/base_team.py
        413
        414
        415
        416
        417
        418
        419
        420
        421
        422
        423
        424
        425
        426
        427
        428
        429
        430
        431
        432
        433
        434
        435
        436
        async def distribute(
            self,
            content: str,
            *,
            tools: list[str] | None = None,
            resources: list[str] | None = None,
            metadata: dict[str, Any] | None = None,
        ):
            """Distribute content and capabilities to all team members."""
            for agent in self.iter_agents():
                # Add context message
                agent.conversation.add_context_message(
                    content, source="distribution", metadata=metadata
                )
        
                # Register tools if provided
                if tools:
                    for tool in tools:
                        agent.tools.register_tool(tool)
        
                # Load resources if provided
                if resources:
                    for resource in resources:
                        await agent.conversation.load_context_source(resource)
        

        get_structure_diagram

        get_structure_diagram() -> str
        

        Generate mermaid flowchart of node hierarchy.

        Source code in src/llmling_agent/delegation/base_team.py
        327
        328
        329
        330
        331
        332
        333
        334
        335
        336
        337
        338
        339
        340
        341
        342
        343
        344
        345
        346
        347
        348
        349
        def get_structure_diagram(self) -> str:
            """Generate mermaid flowchart of node hierarchy."""
            lines = ["flowchart TD"]
        
            def add_node(node: MessageNode[Any, Any], parent: str | None = None):
                """Recursively add node and its members to diagram."""
                node_id = f"node_{id(node)}"
                lines.append(f"    {node_id}[{node.name}]")
                if parent:
                    lines.append(f"    {parent} --> {node_id}")
        
                # If it's a team, recursively add its members
                from llmling_agent.delegation.base_team import BaseTeam
        
                if isinstance(node, BaseTeam):
                    for member in node.agents:
                        add_node(member, node_id)
        
            # Start with root nodes (team members)
            for node in self.agents:
                add_node(node)
        
            return "\n".join(lines)
        

        is_busy

        is_busy() -> bool
        

        Check if team is processing any tasks.

        Source code in src/llmling_agent/delegation/base_team.py
        242
        243
        244
        def is_busy(self) -> bool:
            """Check if team is processing any tasks."""
            return bool(self._pending_tasks or self._main_task)
        

        iter_agents

        iter_agents() -> Iterator[AnyAgent[Any, Any]]
        

        Recursively iterate over all child agents.

        Source code in src/llmling_agent/delegation/base_team.py
        351
        352
        353
        354
        355
        356
        357
        358
        359
        360
        361
        362
        363
        def iter_agents(self) -> Iterator[AnyAgent[Any, Any]]:
            """Recursively iterate over all child agents."""
            from llmling_agent.agent import Agent, StructuredAgent
        
            for node in self.agents:
                match node:
                    case BaseTeam():
                        yield from node.iter_agents()
                    case Agent() | StructuredAgent():
                        yield node
                    case _:
                        msg = f"Invalid node type: {type(node)}"
                        raise ValueError(msg)
        

        pick_agents async

        pick_agents(task: str) -> Sequence[MessageNode[Any, Any]]
        

        Pick agents to run.

        Source code in src/llmling_agent/delegation/base_team.py
        109
        110
        111
        112
        113
        114
        115
        116
        117
        118
        119
        120
        121
        122
        123
        async def pick_agents(self, task: str) -> Sequence[MessageNode[Any, Any]]:
            """Pick agents to run."""
            if self.picker:
                if self.num_picks == 1:
                    result = await self.picker.talk.pick(self, task, self.pick_prompt)
                    return [result.selection]
                result = await self.picker.talk.pick_multiple(
                    self,
                    task,
                    min_picks=self.num_picks or 1,
                    max_picks=self.num_picks,
                    prompt=self.pick_prompt,
                )
                return result.selections
            return list(self.agents)
        

        run_in_background async

        run_in_background(
            *prompts: AnyPromptType | Image | PathLike[str] | None,
            max_count: int | None = 1,
            interval: float = 1.0,
            **kwargs: Any,
        ) -> ExtendedTeamTalk
        

        Start execution in background.

        Parameters:

        Name Type Description Default
        prompts AnyPromptType | Image | PathLike[str] | None

        Prompts to execute

        ()
        max_count int | None

        Maximum number of executions (None = run indefinitely)

        1
        interval float

        Seconds between executions

        1.0
        **kwargs Any

        Additional args for execute()

        {}
        Source code in src/llmling_agent/delegation/base_team.py
        268
        269
        270
        271
        272
        273
        274
        275
        276
        277
        278
        279
        280
        281
        282
        283
        284
        285
        286
        287
        288
        289
        290
        291
        292
        293
        294
        295
        296
        297
        298
        299
        300
        301
        302
        303
        304
        async def run_in_background(
            self,
            *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
            max_count: int | None = 1,  # 1 = single execution, None = indefinite
            interval: float = 1.0,
            **kwargs: Any,
        ) -> ExtendedTeamTalk:
            """Start execution in background.
        
            Args:
                prompts: Prompts to execute
                max_count: Maximum number of executions (None = run indefinitely)
                interval: Seconds between executions
                **kwargs: Additional args for execute()
            """
            if self._main_task:
                msg = "Execution already running"
                raise RuntimeError(msg)
            self._infinite = max_count is None
        
            async def _continuous() -> ChatMessage[Any] | None:
                count = 0
                last_message = None
                while max_count is None or count < max_count:
                    try:
                        result = await self.execute(*prompts, **kwargs)
                        last_message = result[-1].message if result else None
                        count += 1
                        if max_count is None or count < max_count:
                            await asyncio.sleep(interval)
                    except asyncio.CancelledError:
                        logger.debug("Background execution cancelled")
                        break
                return last_message
        
            self._main_task = self.create_task(_continuous(), name="main_execution")
            return self._team_talk
        

        run_sync

        run_sync(
            *prompt: AnyPromptType | Image | PathLike[str], store_history: bool = True
        ) -> ChatMessage[TResult]
        

        Run agent synchronously (convenience wrapper).

        Parameters:

        Name Type Description Default
        prompt AnyPromptType | Image | PathLike[str]

        User query or instruction

        ()
        store_history bool

        Whether the message exchange should be added to the context window

        True

        Returns: Result containing response and run information

        Source code in src/llmling_agent/delegation/base_team.py
        501
        502
        503
        504
        505
        506
        507
        508
        509
        510
        511
        512
        513
        514
        515
        516
        def run_sync(
            self,
            *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str],
            store_history: bool = True,
        ) -> ChatMessage[TResult]:
            """Run agent synchronously (convenience wrapper).
        
            Args:
                prompt: User query or instruction
                store_history: Whether the message exchange should be added to the
                               context window
            Returns:
                Result containing response and run information
            """
            coro = self.run(*prompt, store_history=store_history)
            return self.run_task_sync(coro)
        

        stop async

        stop()
        

        Stop background execution if running.

        Source code in src/llmling_agent/delegation/base_team.py
        246
        247
        248
        249
        250
        251
        252
        async def stop(self):
            """Stop background execution if running."""
            if self._main_task and not self._main_task.done():
                self._main_task.cancel()
                await self._main_task
            self._main_task = None
            await self.cleanup_tasks()
        

        temporary_state async

        temporary_state(
            *,
            system_prompts: list[AnyPromptType] | None = None,
            replace_prompts: bool = False,
            tools: list[ToolType] | None = None,
            replace_tools: bool = False,
            history: list[AnyPromptType] | SessionQuery | None = None,
            replace_history: bool = False,
            pause_routing: bool = False,
            model: ModelType | None = None,
            provider: AgentProvider | None = None,
        ) -> AsyncIterator[Self]
        

        Temporarily modify state of all agents in the team.

        All agents in the team will enter their temporary state simultaneously.

        Parameters:

        Name Type Description Default
        system_prompts list[AnyPromptType] | None

        Temporary system prompts to use

        None
        replace_prompts bool

        Whether to replace existing prompts

        False
        tools list[ToolType] | None

        Temporary tools to make available

        None
        replace_tools bool

        Whether to replace existing tools

        False
        history list[AnyPromptType] | SessionQuery | None

        Conversation history (prompts or query)

        None
        replace_history bool

        Whether to replace existing history

        False
        pause_routing bool

        Whether to pause message routing

        False
        model ModelType | None

        Temporary model override

        None
        provider AgentProvider | None

        Temporary provider override

        None
        Source code in src/llmling_agent/delegation/base_team.py
        438
        439
        440
        441
        442
        443
        444
        445
        446
        447
        448
        449
        450
        451
        452
        453
        454
        455
        456
        457
        458
        459
        460
        461
        462
        463
        464
        465
        466
        467
        468
        469
        470
        471
        472
        473
        474
        475
        476
        477
        478
        479
        480
        481
        482
        483
        484
        485
        486
        487
        488
        489
        490
        491
        492
        @asynccontextmanager
        async def temporary_state(
            self,
            *,
            system_prompts: list[AnyPromptType] | None = None,
            replace_prompts: bool = False,
            tools: list[ToolType] | None = None,
            replace_tools: bool = False,
            history: list[AnyPromptType] | SessionQuery | None = None,
            replace_history: bool = False,
            pause_routing: bool = False,
            model: ModelType | None = None,
            provider: AgentProvider | None = None,
        ) -> AsyncIterator[Self]:
            """Temporarily modify state of all agents in the team.
        
            All agents in the team will enter their temporary state simultaneously.
        
            Args:
                system_prompts: Temporary system prompts to use
                replace_prompts: Whether to replace existing prompts
                tools: Temporary tools to make available
                replace_tools: Whether to replace existing tools
                history: Conversation history (prompts or query)
                replace_history: Whether to replace existing history
                pause_routing: Whether to pause message routing
                model: Temporary model override
                provider: Temporary provider override
            """
            # Get all agents (flattened) before entering context
            agents = list(self.iter_agents())
        
            async with AsyncExitStack() as stack:
                if pause_routing:
                    await stack.enter_async_context(self.connections.paused_routing())
                # Enter temporary state for all agents
                for agent in agents:
                    await stack.enter_async_context(
                        agent.temporary_state(
                            system_prompts=system_prompts,
                            replace_prompts=replace_prompts,
                            tools=tools,
                            replace_tools=replace_tools,
                            history=history,
                            replace_history=replace_history,
                            pause_routing=pause_routing,
                            model=model,
                            provider=provider,
                        )
                    )
                try:
                    yield self
                finally:
                    # AsyncExitStack will handle cleanup of all states
                    pass
        

        wait async

        wait() -> ChatMessage[Any] | None
        

        Wait for background execution to complete and return last message.

        Source code in src/llmling_agent/delegation/base_team.py
        254
        255
        256
        257
        258
        259
        260
        261
        262
        263
        264
        265
        266
        async def wait(self) -> ChatMessage[Any] | None:
            """Wait for background execution to complete and return last message."""
            if not self._main_task:
                msg = "No execution running"
                raise RuntimeError(msg)
            if self._infinite:
                msg = "Cannot wait on infinite execution"
                raise RuntimeError(msg)
            try:
                return await self._main_task
            finally:
                await self.cleanup_tasks()
                self._main_task = None
        

        Team

        Bases: BaseTeam[TDeps, Any]

        Group of agents that can execute together.

        Source code in src/llmling_agent/delegation/team.py
         34
         35
         36
         37
         38
         39
         40
         41
         42
         43
         44
         45
         46
         47
         48
         49
         50
         51
         52
         53
         54
         55
         56
         57
         58
         59
         60
         61
         62
         63
         64
         65
         66
         67
         68
         69
         70
         71
         72
         73
         74
         75
         76
         77
         78
         79
         80
         81
         82
         83
         84
         85
         86
         87
         88
         89
         90
         91
         92
         93
         94
         95
         96
         97
         98
         99
        100
        101
        102
        103
        104
        105
        106
        107
        108
        109
        110
        111
        112
        113
        114
        115
        116
        117
        118
        119
        120
        121
        122
        123
        124
        125
        126
        127
        128
        129
        130
        131
        132
        133
        134
        135
        136
        137
        138
        139
        140
        141
        142
        143
        144
        145
        146
        147
        148
        149
        150
        151
        152
        153
        154
        155
        156
        157
        158
        159
        160
        161
        162
        163
        164
        165
        166
        167
        168
        169
        170
        171
        172
        173
        174
        175
        176
        177
        178
        179
        180
        181
        182
        183
        184
        185
        186
        187
        188
        189
        190
        191
        192
        193
        194
        195
        196
        197
        198
        199
        200
        201
        202
        203
        204
        205
        206
        207
        208
        209
        210
        211
        212
        213
        214
        215
        216
        217
        218
        219
        220
        221
        222
        223
        224
        225
        226
        227
        228
        229
        230
        231
        232
        233
        234
        235
        236
        237
        238
        239
        class Team[TDeps](BaseTeam[TDeps, Any]):
            """Group of agents that can execute together."""
        
            async def execute(
                self,
                *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
                **kwargs: Any,
            ) -> TeamResponse:
                """Run all agents in parallel with monitoring."""
                from llmling_agent.talk.talk import Talk
        
                self._team_talk.clear()
        
                start_time = datetime.now()
                responses: list[AgentResponse[Any]] = []
                errors: dict[str, Exception] = {}
                final_prompt = list(prompts)
                if self.shared_prompt:
                    final_prompt.insert(0, self.shared_prompt)
                combined_prompt = "\n".join([await to_prompt(p) for p in final_prompt])
                all_nodes = list(await self.pick_agents(combined_prompt))
                # Create Talk connections for monitoring this execution
                execution_talks: list[Talk[Any]] = []
                for node in all_nodes:
                    talk = Talk[Any](
                        node,
                        [],  # No actual forwarding, just for tracking
                        connection_type="run",
                        queued=True,
                        queue_strategy="latest",
                    )
                    execution_talks.append(talk)
                    self._team_talk.append(talk)  # Add to base class's TeamTalk
        
                async def _run(node: MessageNode[TDeps, Any]):
                    try:
                        start = perf_counter()
                        message = await node.run(*final_prompt, **kwargs)
                        timing = perf_counter() - start
                        r = AgentResponse(agent_name=node.name, message=message, timing=timing)
                        responses.append(r)
        
                        # Update talk stats for this agent
                        talk = next(t for t in execution_talks if t.source == node)
                        talk._stats.messages.append(message)
        
                    except Exception as e:  # noqa: BLE001
                        errors[node.name] = e
        
                # Run all agents in parallel
                await asyncio.gather(*[_run(node) for node in all_nodes])
        
                return TeamResponse(responses=responses, start_time=start_time, errors=errors)
        
            def __prompt__(self) -> str:
                """Format team info for prompts."""
                members = ", ".join(a.name for a in self.agents)
                desc = f" - {self.description}" if self.description else ""
                return f"Parallel Team '{self.name}'{desc}\nMembers: {members}"
        
            async def run_iter(
                self,
                *prompts: AnyPromptType,
                **kwargs: Any,
            ) -> AsyncIterator[ChatMessage[Any]]:
                """Yield messages as they arrive from parallel execution."""
                queue: asyncio.Queue[ChatMessage[Any] | None] = asyncio.Queue()
                failures: dict[str, Exception] = {}
        
                async def _run(node: MessageNode[TDeps, Any]):
                    try:
                        message = await node.run(*prompts, **kwargs)
                        await queue.put(message)
                    except Exception as e:
                        logger.exception("Error executing node %s", node.name)
                        failures[node.name] = e
                        # Put None to maintain queue count
                        await queue.put(None)
        
                # Get nodes to run
                combined_prompt = "\n".join([await to_prompt(p) for p in prompts])
                all_nodes = list(await self.pick_agents(combined_prompt))
        
                # Start all agents
                tasks = [asyncio.create_task(_run(n), name=f"run_{n.name}") for n in all_nodes]
        
                try:
                    # Yield messages as they arrive
                    for _ in all_nodes:
                        if msg := await queue.get():
                            yield msg
        
                    # If any failures occurred, raise error with details
                    if failures:
                        error_details = "\n".join(
                            f"- {name}: {error}" for name, error in failures.items()
                        )
                        error_msg = f"Some nodes failed to execute:\n{error_details}"
                        raise RuntimeError(error_msg)
        
                finally:
                    # Clean up any remaining tasks
                    for task in tasks:
                        if not task.done():
                            task.cancel()
        
            async def _run(
                self,
                *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
                wait_for_connections: bool | None = None,
                message_id: str | None = None,
                conversation_id: str | None = None,
                **kwargs: Any,
            ) -> ChatMessage[list[Any]]:
                """Run all agents in parallel and return combined message."""
                result: TeamResponse = await self.execute(*prompts, **kwargs)
                message_id = message_id or str(uuid4())
                return ChatMessage(
                    content=[r.message.content for r in result if r.message],
                    role="assistant",
                    name=self.name,
                    message_id=message_id,
                    conversation_id=conversation_id,
                    metadata={
                        "agent_names": [r.agent_name for r in result],
                        "errors": {name: str(error) for name, error in result.errors.items()},
                        "start_time": result.start_time.isoformat(),
                    },
                )
        
            async def run_job[TJobResult](
                self,
                job: Job[TDeps, TJobResult],
                *,
                store_history: bool = True,
                include_agent_tools: bool = True,
            ) -> list[AgentResponse[TJobResult]]:
                """Execute a job across all team members in parallel.
        
                Args:
                    job: Job configuration to execute
                    store_history: Whether to add job execution to conversation history
                    include_agent_tools: Whether to include agent's tools alongside job tools
        
                Returns:
                    List of responses from all agents
        
                Raises:
                    JobError: If job execution fails for any agent
                    ValueError: If job configuration is invalid
                """
                from llmling_agent.agent import Agent, StructuredAgent
                from llmling_agent.tasks import JobError
        
                responses: list[AgentResponse[TJobResult]] = []
                errors: dict[str, Exception] = {}
                start_time = datetime.now()
        
                # Validate dependencies for all agents
                if job.required_dependency is not None:
                    invalid_agents = [
                        agent.name
                        for agent in self.iter_agents()
                        if not isinstance(agent.context.data, job.required_dependency)
                    ]
                    if invalid_agents:
                        msg = (
                            f"Agents {', '.join(invalid_agents)} don't have required "
                            f"dependency type: {job.required_dependency}"
                        )
                        raise JobError(msg)
        
                try:
                    # Load knowledge for all agents if provided
                    if job.knowledge:
                        # TODO: resources
                        tools = [t.name for t in job.get_tools()]
                        await self.distribute(content="", tools=tools)
        
                    prompt = await job.get_prompt()
        
                    async def _run(agent: MessageNode[TDeps, TJobResult]):
                        assert isinstance(agent, Agent | StructuredAgent)
                        try:
                            with agent.tools.temporary_tools(
                                job.get_tools(), exclusive=not include_agent_tools
                            ):
                                start = perf_counter()
                                resp = AgentResponse(
                                    agent_name=agent.name,
                                    message=await agent.run(prompt, store_history=store_history),  # pyright: ignore
                                    timing=perf_counter() - start,
                                )
                                responses.append(resp)
                        except Exception as e:  # noqa: BLE001
                            errors[agent.name] = e
        
                    # Run job in parallel on all agents
                    await asyncio.gather(*[_run(node) for node in self.agents])
        
                    return TeamResponse(responses=responses, start_time=start_time, errors=errors)
        
                except Exception as e:
                    msg = "Job execution failed"
                    logger.exception(msg)
                    raise JobError(msg) from e
        

        __prompt__

        __prompt__() -> str
        

        Format team info for prompts.

        Source code in src/llmling_agent/delegation/team.py
        88
        89
        90
        91
        92
        def __prompt__(self) -> str:
            """Format team info for prompts."""
            members = ", ".join(a.name for a in self.agents)
            desc = f" - {self.description}" if self.description else ""
            return f"Parallel Team '{self.name}'{desc}\nMembers: {members}"
        

        _run async

        _run(
            *prompts: AnyPromptType | Image | PathLike[str] | None,
            wait_for_connections: bool | None = None,
            message_id: str | None = None,
            conversation_id: str | None = None,
            **kwargs: Any,
        ) -> ChatMessage[list[Any]]
        

        Run all agents in parallel and return combined message.

        Source code in src/llmling_agent/delegation/team.py
        140
        141
        142
        143
        144
        145
        146
        147
        148
        149
        150
        151
        152
        153
        154
        155
        156
        157
        158
        159
        160
        161
        162
        async def _run(
            self,
            *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
            wait_for_connections: bool | None = None,
            message_id: str | None = None,
            conversation_id: str | None = None,
            **kwargs: Any,
        ) -> ChatMessage[list[Any]]:
            """Run all agents in parallel and return combined message."""
            result: TeamResponse = await self.execute(*prompts, **kwargs)
            message_id = message_id or str(uuid4())
            return ChatMessage(
                content=[r.message.content for r in result if r.message],
                role="assistant",
                name=self.name,
                message_id=message_id,
                conversation_id=conversation_id,
                metadata={
                    "agent_names": [r.agent_name for r in result],
                    "errors": {name: str(error) for name, error in result.errors.items()},
                    "start_time": result.start_time.isoformat(),
                },
            )
        

        execute async

        execute(
            *prompts: AnyPromptType | Image | PathLike[str] | None, **kwargs: Any
        ) -> TeamResponse
        

        Run all agents in parallel with monitoring.

        Source code in src/llmling_agent/delegation/team.py
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        74
        75
        76
        77
        78
        79
        80
        81
        82
        83
        84
        85
        86
        async def execute(
            self,
            *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
            **kwargs: Any,
        ) -> TeamResponse:
            """Run all agents in parallel with monitoring."""
            from llmling_agent.talk.talk import Talk
        
            self._team_talk.clear()
        
            start_time = datetime.now()
            responses: list[AgentResponse[Any]] = []
            errors: dict[str, Exception] = {}
            final_prompt = list(prompts)
            if self.shared_prompt:
                final_prompt.insert(0, self.shared_prompt)
            combined_prompt = "\n".join([await to_prompt(p) for p in final_prompt])
            all_nodes = list(await self.pick_agents(combined_prompt))
            # Create Talk connections for monitoring this execution
            execution_talks: list[Talk[Any]] = []
            for node in all_nodes:
                talk = Talk[Any](
                    node,
                    [],  # No actual forwarding, just for tracking
                    connection_type="run",
                    queued=True,
                    queue_strategy="latest",
                )
                execution_talks.append(talk)
                self._team_talk.append(talk)  # Add to base class's TeamTalk
        
            async def _run(node: MessageNode[TDeps, Any]):
                try:
                    start = perf_counter()
                    message = await node.run(*final_prompt, **kwargs)
                    timing = perf_counter() - start
                    r = AgentResponse(agent_name=node.name, message=message, timing=timing)
                    responses.append(r)
        
                    # Update talk stats for this agent
                    talk = next(t for t in execution_talks if t.source == node)
                    talk._stats.messages.append(message)
        
                except Exception as e:  # noqa: BLE001
                    errors[node.name] = e
        
            # Run all agents in parallel
            await asyncio.gather(*[_run(node) for node in all_nodes])
        
            return TeamResponse(responses=responses, start_time=start_time, errors=errors)
        

        run_iter async

        run_iter(*prompts: AnyPromptType, **kwargs: Any) -> AsyncIterator[ChatMessage[Any]]
        

        Yield messages as they arrive from parallel execution.

        Source code in src/llmling_agent/delegation/team.py
         94
         95
         96
         97
         98
         99
        100
        101
        102
        103
        104
        105
        106
        107
        108
        109
        110
        111
        112
        113
        114
        115
        116
        117
        118
        119
        120
        121
        122
        123
        124
        125
        126
        127
        128
        129
        130
        131
        132
        133
        134
        135
        136
        137
        138
        async def run_iter(
            self,
            *prompts: AnyPromptType,
            **kwargs: Any,
        ) -> AsyncIterator[ChatMessage[Any]]:
            """Yield messages as they arrive from parallel execution."""
            queue: asyncio.Queue[ChatMessage[Any] | None] = asyncio.Queue()
            failures: dict[str, Exception] = {}
        
            async def _run(node: MessageNode[TDeps, Any]):
                try:
                    message = await node.run(*prompts, **kwargs)
                    await queue.put(message)
                except Exception as e:
                    logger.exception("Error executing node %s", node.name)
                    failures[node.name] = e
                    # Put None to maintain queue count
                    await queue.put(None)
        
            # Get nodes to run
            combined_prompt = "\n".join([await to_prompt(p) for p in prompts])
            all_nodes = list(await self.pick_agents(combined_prompt))
        
            # Start all agents
            tasks = [asyncio.create_task(_run(n), name=f"run_{n.name}") for n in all_nodes]
        
            try:
                # Yield messages as they arrive
                for _ in all_nodes:
                    if msg := await queue.get():
                        yield msg
        
                # If any failures occurred, raise error with details
                if failures:
                    error_details = "\n".join(
                        f"- {name}: {error}" for name, error in failures.items()
                    )
                    error_msg = f"Some nodes failed to execute:\n{error_details}"
                    raise RuntimeError(error_msg)
        
            finally:
                # Clean up any remaining tasks
                for task in tasks:
                    if not task.done():
                        task.cancel()
        

        run_job async

        run_job(
            job: Job[TDeps, TJobResult],
            *,
            store_history: bool = True,
            include_agent_tools: bool = True,
        ) -> list[AgentResponse[TJobResult]]
        

        Execute a job across all team members in parallel.

        Parameters:

        Name Type Description Default
        job Job[TDeps, TJobResult]

        Job configuration to execute

        required
        store_history bool

        Whether to add job execution to conversation history

        True
        include_agent_tools bool

        Whether to include agent's tools alongside job tools

        True

        Returns:

        Type Description
        list[AgentResponse[TJobResult]]

        List of responses from all agents

        Raises:

        Type Description
        JobError

        If job execution fails for any agent

        ValueError

        If job configuration is invalid

        Source code in src/llmling_agent/delegation/team.py
        164
        165
        166
        167
        168
        169
        170
        171
        172
        173
        174
        175
        176
        177
        178
        179
        180
        181
        182
        183
        184
        185
        186
        187
        188
        189
        190
        191
        192
        193
        194
        195
        196
        197
        198
        199
        200
        201
        202
        203
        204
        205
        206
        207
        208
        209
        210
        211
        212
        213
        214
        215
        216
        217
        218
        219
        220
        221
        222
        223
        224
        225
        226
        227
        228
        229
        230
        231
        232
        233
        234
        235
        236
        237
        238
        239
        async def run_job[TJobResult](
            self,
            job: Job[TDeps, TJobResult],
            *,
            store_history: bool = True,
            include_agent_tools: bool = True,
        ) -> list[AgentResponse[TJobResult]]:
            """Execute a job across all team members in parallel.
        
            Args:
                job: Job configuration to execute
                store_history: Whether to add job execution to conversation history
                include_agent_tools: Whether to include agent's tools alongside job tools
        
            Returns:
                List of responses from all agents
        
            Raises:
                JobError: If job execution fails for any agent
                ValueError: If job configuration is invalid
            """
            from llmling_agent.agent import Agent, StructuredAgent
            from llmling_agent.tasks import JobError
        
            responses: list[AgentResponse[TJobResult]] = []
            errors: dict[str, Exception] = {}
            start_time = datetime.now()
        
            # Validate dependencies for all agents
            if job.required_dependency is not None:
                invalid_agents = [
                    agent.name
                    for agent in self.iter_agents()
                    if not isinstance(agent.context.data, job.required_dependency)
                ]
                if invalid_agents:
                    msg = (
                        f"Agents {', '.join(invalid_agents)} don't have required "
                        f"dependency type: {job.required_dependency}"
                    )
                    raise JobError(msg)
        
            try:
                # Load knowledge for all agents if provided
                if job.knowledge:
                    # TODO: resources
                    tools = [t.name for t in job.get_tools()]
                    await self.distribute(content="", tools=tools)
        
                prompt = await job.get_prompt()
        
                async def _run(agent: MessageNode[TDeps, TJobResult]):
                    assert isinstance(agent, Agent | StructuredAgent)
                    try:
                        with agent.tools.temporary_tools(
                            job.get_tools(), exclusive=not include_agent_tools
                        ):
                            start = perf_counter()
                            resp = AgentResponse(
                                agent_name=agent.name,
                                message=await agent.run(prompt, store_history=store_history),  # pyright: ignore
                                timing=perf_counter() - start,
                            )
                            responses.append(resp)
                    except Exception as e:  # noqa: BLE001
                        errors[agent.name] = e
        
                # Run job in parallel on all agents
                await asyncio.gather(*[_run(node) for node in self.agents])
        
                return TeamResponse(responses=responses, start_time=start_time, errors=errors)
        
            except Exception as e:
                msg = "Job execution failed"
                logger.exception(msg)
                raise JobError(msg) from e
        

        TeamRun

        Bases: BaseTeam[TDeps, TResult]

        Handles team operations with monitoring.

        Source code in src/llmling_agent/delegation/teamrun.py
         59
         60
         61
         62
         63
         64
         65
         66
         67
         68
         69
         70
         71
         72
         73
         74
         75
         76
         77
         78
         79
         80
         81
         82
         83
         84
         85
         86
         87
         88
         89
         90
         91
         92
         93
         94
         95
         96
         97
         98
         99
        100
        101
        102
        103
        104
        105
        106
        107
        108
        109
        110
        111
        112
        113
        114
        115
        116
        117
        118
        119
        120
        121
        122
        123
        124
        125
        126
        127
        128
        129
        130
        131
        132
        133
        134
        135
        136
        137
        138
        139
        140
        141
        142
        143
        144
        145
        146
        147
        148
        149
        150
        151
        152
        153
        154
        155
        156
        157
        158
        159
        160
        161
        162
        163
        164
        165
        166
        167
        168
        169
        170
        171
        172
        173
        174
        175
        176
        177
        178
        179
        180
        181
        182
        183
        184
        185
        186
        187
        188
        189
        190
        191
        192
        193
        194
        195
        196
        197
        198
        199
        200
        201
        202
        203
        204
        205
        206
        207
        208
        209
        210
        211
        212
        213
        214
        215
        216
        217
        218
        219
        220
        221
        222
        223
        224
        225
        226
        227
        228
        229
        230
        231
        232
        233
        234
        235
        236
        237
        238
        239
        240
        241
        242
        243
        244
        245
        246
        247
        248
        249
        250
        251
        252
        253
        254
        255
        256
        257
        258
        259
        260
        261
        262
        263
        264
        265
        266
        267
        268
        269
        270
        271
        272
        273
        274
        275
        276
        277
        278
        279
        280
        281
        282
        283
        284
        285
        286
        287
        288
        289
        290
        291
        292
        293
        294
        295
        296
        297
        298
        299
        class TeamRun[TDeps, TResult](BaseTeam[TDeps, TResult]):
            """Handles team operations with monitoring."""
        
            def __init__(
                self,
                agents: Sequence[MessageNode[TDeps, Any]],
                *,
                name: str | None = None,
                description: str | None = None,
                shared_prompt: str | None = None,
                validator: MessageNode[Any, TResult] | None = None,
                picker: AnyAgent[Any, Any] | None = None,
                num_picks: int | None = None,
                pick_prompt: str | None = None,
                # result_mode: ResultMode = "last",
            ):
                super().__init__(
                    agents,
                    name=name,
                    description=description,
                    shared_prompt=shared_prompt,
                    picker=picker,
                    num_picks=num_picks,
                    pick_prompt=pick_prompt,
                )
                self.validator = validator
                self.result_mode = "last"
        
            def __prompt__(self) -> str:
                """Format team info for prompts."""
                members = " -> ".join(a.name for a in self.agents)
                desc = f" - {self.description}" if self.description else ""
                return f"Sequential Team '{self.name}'{desc}\nPipeline: {members}"
        
            async def _run(
                self,
                *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
                wait_for_connections: bool | None = None,
                message_id: str | None = None,
                conversation_id: str | None = None,
                **kwargs: Any,
            ) -> ChatMessage[TResult]:
                """Run agents sequentially and return combined message.
        
                This message wraps execute and extracts the ChatMessage in order to fulfill
                the "message protocol".
                """
                message_id = message_id or str(uuid4())
        
                result = await self.execute(*prompts, **kwargs)
                all_messages = [r.message for r in result if r.message]
                assert all_messages, "Error during execution, returned None for TeamRun"
                # Determine content based on mode
                match self.result_mode:
                    case "last":
                        content = all_messages[-1].content
                    # case "concat":
                    #     content = "\n".join(msg.format() for msg in all_messages)
                    case _:
                        msg = f"Invalid result mode: {self.result_mode}"
                        raise ValueError(msg)
        
                return ChatMessage(
                    content=content,
                    role="assistant",
                    name=self.name,
                    associated_messages=all_messages,
                    message_id=message_id,
                    conversation_id=conversation_id,
                    metadata={
                        "execution_order": [r.agent_name for r in result],
                        "start_time": result.start_time.isoformat(),
                        "errors": {name: str(error) for name, error in result.errors.items()},
                    },
                )
        
            async def execute(
                self,
                *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
                **kwargs: Any,
            ) -> TeamResponse[TResult]:
                """Start execution with optional monitoring."""
                self._team_talk.clear()
                start_time = datetime.now()
                final_prompt = list(prompts)
                if self.shared_prompt:
                    final_prompt.insert(0, self.shared_prompt)
        
                responses = [
                    i
                    async for i in self.execute_iter(*final_prompt)
                    if isinstance(i, AgentResponse)
                ]
                return TeamResponse(responses, start_time)
        
            async def run_iter(
                self,
                *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
                **kwargs: Any,
            ) -> AsyncIterator[ChatMessage[Any]]:
                """Yield messages from the execution chain."""
                async for item in self.execute_iter(*prompts, **kwargs):
                    match item:
                        case AgentResponse():
                            if item.message:
                                yield item.message
                        case Talk():
                            pass
        
            async def execute_iter(
                self,
                *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str],
                **kwargs: Any,
            ) -> AsyncIterator[Talk[Any] | AgentResponse[Any]]:
                from toprompt import to_prompt
        
                connections: list[Talk[Any]] = []
                try:
                    combined_prompt = "\n".join([await to_prompt(p) for p in prompt])
                    all_nodes = list(await self.pick_agents(combined_prompt))
                    if self.validator:
                        all_nodes.append(self.validator)
                    first = all_nodes[0]
                    connections = [
                        source.connect_to(target, queued=True)
                        for source, target in pairwise(all_nodes)
                    ]
                    for conn in connections:
                        self._team_talk.append(conn)
        
                    # First agent
                    start = perf_counter()
                    message = await first.run(*prompt, **kwargs)
                    timing = perf_counter() - start
                    response = AgentResponse[Any](first.name, message=message, timing=timing)
                    yield response
        
                    # Process through chain
                    for connection in connections:
                        target = connection.targets[0]
                        target_name = target.name
                        yield connection
        
                        # Let errors propagate - they break the chain
                        start = perf_counter()
                        messages = await connection.trigger()
        
                        # If this is the last node
                        if target == all_nodes[-1]:
                            last_talk = Talk[Any](target, [], connection_type="run")
                            if response.message:
                                last_talk.stats.messages.append(response.message)
                            self._team_talk.append(last_talk)
        
                        timing = perf_counter() - start
                        msg = messages[0]
                        response = AgentResponse[Any](target_name, message=msg, timing=timing)
                        yield response
        
                finally:
                    # Always clean up connections
                    for connection in connections:
                        connection.disconnect()
        
            @asynccontextmanager
            async def chain_stream(
                self,
                *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
                require_all: bool = True,
                **kwargs: Any,
            ) -> AsyncIterator[StreamingResponseProtocol]:
                """Stream results through chain of team members."""
                from llmling_agent.agent import Agent, StructuredAgent
                from llmling_agent.delegation import TeamRun
                from llmling_agent_providers.base import StreamingResponseProtocol
        
                async with AsyncExitStack() as stack:
                    streams: list[StreamingResponseProtocol[str]] = []
                    current_message = prompts
        
                    # Set up all streams
                    for agent in self.agents:
                        try:
                            assert isinstance(agent, TeamRun | Agent | StructuredAgent), (
                                "Cannot stream teams!"
                            )
                            stream = await stack.enter_async_context(
                                agent.run_stream(*current_message, **kwargs)
                            )
                            streams.append(stream)  # type: ignore
                            # Wait for complete response for next agent
                            async for chunk in stream.stream():
                                current_message = chunk
                                if stream.is_complete:
                                    current_message = (stream.formatted_content,)  # type: ignore
                                    break
                        except Exception as e:
                            if require_all:
                                msg = f"Chain broken at {agent.name}: {e}"
                                raise ValueError(msg) from e
                            logger.warning("Chain handler %s failed: %s", agent.name, e)
        
                    # Create a stream-like interface for the chain
                    class ChainStream(StreamingResponseProtocol[str]):
                        def __init__(self):
                            self.streams = streams
                            self.current_stream_idx = 0
                            self.is_complete = False
                            self.model_name = None
        
                        def usage(self) -> Usage:
                            @dataclass
                            class Usage:
                                total_tokens: int | None
                                request_tokens: int | None
                                response_tokens: int | None
        
                            return Usage(0, 0, 0)
        
                        async def stream(self) -> AsyncIterator[str]:  # type: ignore
                            for idx, stream in enumerate(self.streams):
                                self.current_stream_idx = idx
                                async for chunk in stream.stream():
                                    yield chunk
                                    if idx == len(self.streams) - 1 and stream.is_complete:
                                        self.is_complete = True
        
                    yield ChainStream()
        
            @asynccontextmanager
            async def run_stream(
                self,
                *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
                **kwargs: Any,
            ) -> AsyncIterator[StreamingResponseProtocol[TResult]]:
                """Stream responses through the chain.
        
                Provides same interface as Agent.run_stream.
                """
                async with self.chain_stream(*prompts, **kwargs) as stream:
                    yield stream
        

        __prompt__

        __prompt__() -> str
        

        Format team info for prompts.

        Source code in src/llmling_agent/delegation/teamrun.py
        87
        88
        89
        90
        91
        def __prompt__(self) -> str:
            """Format team info for prompts."""
            members = " -> ".join(a.name for a in self.agents)
            desc = f" - {self.description}" if self.description else ""
            return f"Sequential Team '{self.name}'{desc}\nPipeline: {members}"
        

        _run async

        _run(
            *prompts: AnyPromptType | Image | PathLike[str] | None,
            wait_for_connections: bool | None = None,
            message_id: str | None = None,
            conversation_id: str | None = None,
            **kwargs: Any,
        ) -> ChatMessage[TResult]
        

        Run agents sequentially and return combined message.

        This message wraps execute and extracts the ChatMessage in order to fulfill the "message protocol".

        Source code in src/llmling_agent/delegation/teamrun.py
         93
         94
         95
         96
         97
         98
         99
        100
        101
        102
        103
        104
        105
        106
        107
        108
        109
        110
        111
        112
        113
        114
        115
        116
        117
        118
        119
        120
        121
        122
        123
        124
        125
        126
        127
        128
        129
        130
        131
        132
        133
        async def _run(
            self,
            *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
            wait_for_connections: bool | None = None,
            message_id: str | None = None,
            conversation_id: str | None = None,
            **kwargs: Any,
        ) -> ChatMessage[TResult]:
            """Run agents sequentially and return combined message.
        
            This message wraps execute and extracts the ChatMessage in order to fulfill
            the "message protocol".
            """
            message_id = message_id or str(uuid4())
        
            result = await self.execute(*prompts, **kwargs)
            all_messages = [r.message for r in result if r.message]
            assert all_messages, "Error during execution, returned None for TeamRun"
            # Determine content based on mode
            match self.result_mode:
                case "last":
                    content = all_messages[-1].content
                # case "concat":
                #     content = "\n".join(msg.format() for msg in all_messages)
                case _:
                    msg = f"Invalid result mode: {self.result_mode}"
                    raise ValueError(msg)
        
            return ChatMessage(
                content=content,
                role="assistant",
                name=self.name,
                associated_messages=all_messages,
                message_id=message_id,
                conversation_id=conversation_id,
                metadata={
                    "execution_order": [r.agent_name for r in result],
                    "start_time": result.start_time.isoformat(),
                    "errors": {name: str(error) for name, error in result.errors.items()},
                },
            )
        

        chain_stream async

        chain_stream(
            *prompts: AnyPromptType | Image | PathLike[str] | None,
            require_all: bool = True,
            **kwargs: Any,
        ) -> AsyncIterator[StreamingResponseProtocol]
        

        Stream results through chain of team members.

        Source code in src/llmling_agent/delegation/teamrun.py
        223
        224
        225
        226
        227
        228
        229
        230
        231
        232
        233
        234
        235
        236
        237
        238
        239
        240
        241
        242
        243
        244
        245
        246
        247
        248
        249
        250
        251
        252
        253
        254
        255
        256
        257
        258
        259
        260
        261
        262
        263
        264
        265
        266
        267
        268
        269
        270
        271
        272
        273
        274
        275
        276
        277
        278
        279
        280
        281
        282
        283
        284
        285
        286
        @asynccontextmanager
        async def chain_stream(
            self,
            *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
            require_all: bool = True,
            **kwargs: Any,
        ) -> AsyncIterator[StreamingResponseProtocol]:
            """Stream results through chain of team members."""
            from llmling_agent.agent import Agent, StructuredAgent
            from llmling_agent.delegation import TeamRun
            from llmling_agent_providers.base import StreamingResponseProtocol
        
            async with AsyncExitStack() as stack:
                streams: list[StreamingResponseProtocol[str]] = []
                current_message = prompts
        
                # Set up all streams
                for agent in self.agents:
                    try:
                        assert isinstance(agent, TeamRun | Agent | StructuredAgent), (
                            "Cannot stream teams!"
                        )
                        stream = await stack.enter_async_context(
                            agent.run_stream(*current_message, **kwargs)
                        )
                        streams.append(stream)  # type: ignore
                        # Wait for complete response for next agent
                        async for chunk in stream.stream():
                            current_message = chunk
                            if stream.is_complete:
                                current_message = (stream.formatted_content,)  # type: ignore
                                break
                    except Exception as e:
                        if require_all:
                            msg = f"Chain broken at {agent.name}: {e}"
                            raise ValueError(msg) from e
                        logger.warning("Chain handler %s failed: %s", agent.name, e)
        
                # Create a stream-like interface for the chain
                class ChainStream(StreamingResponseProtocol[str]):
                    def __init__(self):
                        self.streams = streams
                        self.current_stream_idx = 0
                        self.is_complete = False
                        self.model_name = None
        
                    def usage(self) -> Usage:
                        @dataclass
                        class Usage:
                            total_tokens: int | None
                            request_tokens: int | None
                            response_tokens: int | None
        
                        return Usage(0, 0, 0)
        
                    async def stream(self) -> AsyncIterator[str]:  # type: ignore
                        for idx, stream in enumerate(self.streams):
                            self.current_stream_idx = idx
                            async for chunk in stream.stream():
                                yield chunk
                                if idx == len(self.streams) - 1 and stream.is_complete:
                                    self.is_complete = True
        
                yield ChainStream()
        

        execute async

        execute(
            *prompts: AnyPromptType | Image | PathLike[str] | None, **kwargs: Any
        ) -> TeamResponse[TResult]
        

        Start execution with optional monitoring.

        Source code in src/llmling_agent/delegation/teamrun.py
        135
        136
        137
        138
        139
        140
        141
        142
        143
        144
        145
        146
        147
        148
        149
        150
        151
        152
        async def execute(
            self,
            *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
            **kwargs: Any,
        ) -> TeamResponse[TResult]:
            """Start execution with optional monitoring."""
            self._team_talk.clear()
            start_time = datetime.now()
            final_prompt = list(prompts)
            if self.shared_prompt:
                final_prompt.insert(0, self.shared_prompt)
        
            responses = [
                i
                async for i in self.execute_iter(*final_prompt)
                if isinstance(i, AgentResponse)
            ]
            return TeamResponse(responses, start_time)
        

        run_iter async

        run_iter(
            *prompts: AnyPromptType | Image | PathLike[str], **kwargs: Any
        ) -> AsyncIterator[ChatMessage[Any]]
        

        Yield messages from the execution chain.

        Source code in src/llmling_agent/delegation/teamrun.py
        154
        155
        156
        157
        158
        159
        160
        161
        162
        163
        164
        165
        166
        async def run_iter(
            self,
            *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
            **kwargs: Any,
        ) -> AsyncIterator[ChatMessage[Any]]:
            """Yield messages from the execution chain."""
            async for item in self.execute_iter(*prompts, **kwargs):
                match item:
                    case AgentResponse():
                        if item.message:
                            yield item.message
                    case Talk():
                        pass
        

        run_stream async

        run_stream(
            *prompts: AnyPromptType | Image | PathLike[str], **kwargs: Any
        ) -> AsyncIterator[StreamingResponseProtocol[TResult]]
        

        Stream responses through the chain.

        Provides same interface as Agent.run_stream.

        Source code in src/llmling_agent/delegation/teamrun.py
        288
        289
        290
        291
        292
        293
        294
        295
        296
        297
        298
        299
        @asynccontextmanager
        async def run_stream(
            self,
            *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
            **kwargs: Any,
        ) -> AsyncIterator[StreamingResponseProtocol[TResult]]:
            """Stream responses through the chain.
        
            Provides same interface as Agent.run_stream.
            """
            async with self.chain_stream(*prompts, **kwargs) as stream:
                yield stream