Skip to content

AgentPool

Base classes

Name Children Inherits
BaseRegistry
llmling.core.baseregistry
Base class for registries providing item storage and change notifications.

⋔ Inheritance diagram

graph TD
  94350421139104["pool.AgentPool"]
  94350400308096["baseregistry.BaseRegistry"]
  94350360924160["abc.MutableMapping"]
  94350360920192["abc.Mapping"]
  94350360914240["abc.Collection"]
  94350360912256["abc.Sized"]
  140709601677504["builtins.object"]
  94350360908288["abc.Iterable"]
  94350360913248["abc.Container"]
  94350360717680["abc.ABC"]
  94350360566400["typing.Generic"]
  94350400308096 --> 94350421139104
  94350360924160 --> 94350400308096
  94350360920192 --> 94350360924160
  94350360914240 --> 94350360920192
  94350360912256 --> 94350360914240
  140709601677504 --> 94350360912256
  94350360908288 --> 94350360914240
  140709601677504 --> 94350360908288
  94350360913248 --> 94350360914240
  140709601677504 --> 94350360913248
  94350360717680 --> 94350400308096
  140709601677504 --> 94350360717680
  94350360566400 --> 94350400308096
  140709601677504 --> 94350360566400

🛈 DocStrings

Bases: BaseRegistry[str, AnyAgent[Any, Any]]

Pool of initialized agents.

Each agent maintains its own runtime environment based on its configuration.

Source code in src/llmling_agent/delegation/pool.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
class AgentPool(BaseRegistry[str, AnyAgent[Any, Any]]):
    """Pool of initialized agents.

    Each agent maintains its own runtime environment based on its configuration.
    """

    def __init__(
        self,
        manifest: AgentsManifest,
        *,
        agents_to_load: list[str] | None = None,
        connect_agents: bool = True,
        confirmation_callback: ConfirmationCallback | None = None,
    ):
        """Initialize agent pool with immediate agent creation.

        Args:
            manifest: Agent configuration manifest
            agents_to_load: Optional list of agent names to initialize
                          If None, all agents from manifest are loaded
            connect_agents: Whether to set up forwarding connections
            confirmation_callback: Handler callback for tool / step confirmations.
        """
        super().__init__()
        from llmling_agent.models.context import AgentContext
        from llmling_agent.storage.manager import StorageManager

        self.manifest = manifest
        self._confirmation_callback = confirmation_callback
        self.exit_stack = AsyncExitStack()
        self.storage = StorageManager(manifest.storage)

        # Validate requested agents exist
        to_load = set(agents_to_load) if agents_to_load else set(manifest.agents)
        if invalid := (to_load - set(manifest.agents)):
            msg = f"Unknown agents: {', '.join(invalid)}"
            raise ValueError(msg)
        # register tasks
        self._tasks = TaskRegistry()
        # Register tasks from manifest
        for name, task in manifest.tasks.items():
            self._tasks.register(name, task)
        self.pool_talk = TeamTalk.from_agents(list(self.agents.values()))
        # Create requested agents immediately using sync initialization
        for name in to_load:
            config = manifest.agents[name]
            # Create runtime without async context
            cfg = config.get_config()
            runtime = RuntimeConfig.from_config(cfg)

            # Create context with config path and capabilities
            context = AgentContext[Any](
                agent_name=name,
                capabilities=config.capabilities,
                definition=self.manifest,
                config=config,
                pool=self,
                confirmation_callback=confirmation_callback,
            )

            # Create agent with runtime and context
            agent = Agent[Any](
                runtime=runtime,
                context=context,
                result_type=None,  # type: ignore[arg-type]
                model=config.model,  # type: ignore[arg-type]
                system_prompt=config.system_prompts,
                name=name,
                enable_db_logging=config.enable_db_logging,
            )
            self.register(name, agent)

        # Then set up worker relationships
        for name, config in manifest.agents.items():
            if name in self and config.workers:
                self.setup_agent_workers(self[name], config.workers)

        # Set up forwarding connections
        if connect_agents:
            self._connect_signals()

    async def __aenter__(self) -> Self:
        """Enter async context and initialize all agents."""
        try:
            # Enter async context for all agents
            for agent in self.agents.values():
                await self.exit_stack.enter_async_context(agent)
        except Exception as e:
            await self.cleanup()
            msg = "Failed to initialize agent pool"
            logger.exception(msg, exc_info=e)
            raise RuntimeError(msg) from e
        else:
            return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ):
        """Exit async context."""
        await self.cleanup()

    async def cleanup(self):
        """Clean up all agents."""
        for agent in self.values():
            if agent.runtime:
                await agent.runtime.shutdown()
        await self.exit_stack.aclose()
        self.clear()

    def create_group[TDeps](
        self,
        agents: Sequence[str | AnyAgent[TDeps, Any]] | None = None,
        *,
        model_override: str | None = None,
        environment_override: StrPath | Config | None = None,
        shared_prompt: str | None = None,
        shared_deps: TDeps | None = None,
    ) -> Team[TDeps]:
        """Create a group from agent names or instances.

        Args:
            agents: List of agent names or instances (all if None)
            model_override: Optional model to use for all agents
            environment_override: Optional environment for all agents
            shared_prompt: Optional prompt for all agents
            shared_deps: Optional shared dependencies
        """
        from llmling_agent.delegation.agentgroup import Team

        if agents is None:
            agents = list(self.agents.keys())

        # First resolve/configure agents
        resolved_agents: list[AnyAgent[TDeps, Any]] = []
        for agent in agents:
            if isinstance(agent, str):
                agent = self.get_agent(
                    agent,
                    model_override=model_override,
                    environment_override=environment_override,
                )
            resolved_agents.append(agent)

        return Team(
            agents=resolved_agents,
            # pool=self,
            shared_prompt=shared_prompt,
            shared_deps=shared_deps,
        )

    def start_supervision(self) -> OptionalAwaitable[None]:
        """Start supervision interface.

        Can be called either synchronously or asynchronously:

        # Sync usage:
        start_supervision(pool)

        # Async usage:
        await start_supervision(pool)
        """
        from llmling_agent.delegation.supervisor_ui import SupervisorApp

        app = SupervisorApp(self)
        if asyncio.get_event_loop().is_running():
            # We're in an async context
            return app.run_async()
        # We're in a sync context
        app.run()
        return None

    @property
    def agents(self) -> EventedDict[str, AnyAgent[Any, Any]]:
        """Get agents dict (backward compatibility)."""
        return self._items

    @property
    def _error_class(self) -> type[LLMLingError]:
        """Error class for agent operations."""
        return LLMLingError

    def _validate_item(self, item: Agent[Any] | Any) -> Agent[Any]:
        """Validate and convert items before registration.

        Args:
            item: Item to validate

        Returns:
            Validated Agent

        Raises:
            LLMlingError: If item is not a valid agent
        """
        if not isinstance(item, Agent):
            msg = f"Item must be Agent, got {type(item)}"
            raise self._error_class(msg)
        return item

    def _setup_connections(self):
        """Set up forwarding connections between agents."""
        from llmling_agent.models.forward_targets import AgentTarget

        for name, config in self.manifest.agents.items():
            if name not in self.agents:
                continue
            agent = self.agents[name]
            for target in config.forward_to:
                if isinstance(target, AgentTarget):
                    if target.name not in self.agents:
                        msg = f"Forward target {target.name} not loaded for {name}"
                        raise ValueError(msg)
                    target_agent = self.agents[target.name]
                    agent.pass_results_to(target_agent)

    def _connect_signals(self):
        """Set up forwarding connections between agents."""
        from llmling_agent.models.forward_targets import AgentTarget

        for name, config in self.manifest.agents.items():
            if name not in self.agents:
                continue
            agent = self.agents[name]
            for target in config.forward_to:
                if isinstance(target, AgentTarget):
                    if target.name not in self.agents:
                        msg = f"Forward target {target.name} not loaded for {name}"
                        raise ValueError(msg)
                    target_agent = self.agents[target.name]
                    agent.pass_results_to(
                        target_agent,
                        connection_type=target.connection_type,
                    )

    async def create_agent(
        self,
        name: str,
        config: AgentConfig,
        *,
        temporary: bool = True,
    ) -> Agent[Any]:
        """Create and register a new agent in the pool.

        Args:
            name: Name of the new agent
            config: Agent configuration
            temporary: If True, agent won't be added to manifest

        Returns:
            Created and initialized agent

        Raises:
            ValueError: If agent name already exists
            RuntimeError: If agent initialization fails
        """
        from llmling_agent.models.context import AgentContext

        if name in self.agents:
            msg = f"Agent {name} already exists"
            raise ValueError(msg)

        try:
            # Create runtime from agent's config
            cfg = config.get_config()
            runtime = RuntimeConfig.from_config(cfg)

            # Create context with config path and capabilities
            context = AgentContext[Any](
                agent_name=name,
                capabilities=config.capabilities,
                definition=self.manifest,
                config=config,
                pool=self,
            )

            # Create agent with runtime and context
            agent = Agent[Any](
                agent_type=config.get_provider(),
                runtime=runtime,
                context=context,
                result_type=None,  # type: ignore[arg-type]
                model=config.model,  # type: ignore[arg-type]
                system_prompt=config.system_prompts,
                name=name,
            )

            # Enter agent's async context through pool's exit stack
            agent = await self.exit_stack.enter_async_context(agent)

            # Set up workers if defined
            if config.workers:
                self.setup_agent_workers(agent, config.workers)

            # Register in pool and optionally manifest
            self.agents[name] = agent
            if not temporary:
                self.manifest.agents[name] = config
        except Exception as e:
            msg = f"Failed to create agent {name}"
            raise RuntimeError(msg) from e
        else:
            return agent

    async def clone_agent[TDeps, TResult](
        self,
        agent: Agent[TDeps] | str,
        new_name: str | None = None,
        *,
        model_override: str | None = None,
        system_prompts: list[str] | None = None,
        template_context: dict[str, Any] | None = None,
    ) -> Agent[TDeps]:
        """Create a copy of an agent.

        Args:
            agent: Agent instance or name to clone
            new_name: Optional name for the clone
            model_override: Optional different model
            system_prompts: Optional different prompts
            template_context: Variables for template rendering

        Returns:
            The new agent instance
        """
        # Get original config
        if isinstance(agent, str):
            if agent not in self.manifest.agents:
                msg = f"Agent {agent} not found"
                raise KeyError(msg)
            config = self.manifest.agents[agent]
            original_agent: Agent[TDeps] = self.get_agent(agent)
        else:
            config = agent.context.config  # type: ignore
            original_agent = agent

        # Create new config
        new_config = config.model_copy(deep=True)

        # Apply overrides
        if model_override:
            new_config.model = model_override
        if system_prompts:
            new_config.system_prompts = system_prompts

        # Handle template rendering
        if template_context:
            new_config.system_prompts = new_config.render_system_prompts(template_context)

        # Create new agent with same runtime
        new_agent = Agent[TDeps](
            runtime=original_agent.runtime,
            context=original_agent.context,
            # result_type=original_agent.actual_type,
            model=new_config.model,  # type: ignore
            system_prompt=new_config.system_prompts,
            name=new_name or f"{config.name}_copy_{len(self.agents)}",
        )

        # Register in pool
        agent_name = new_agent.name
        self.manifest.agents[agent_name] = new_config
        self.agents[agent_name] = new_agent

        return new_agent

    def setup_agent_workers(self, agent: AnyAgent[Any, Any], workers: list[WorkerConfig]):
        """Set up workers for an agent from configuration."""
        for worker_config in workers:
            try:
                worker = self.get_agent(worker_config.name)
                agent.register_worker(
                    worker,
                    name=worker_config.name,
                    reset_history_on_run=worker_config.reset_history_on_run,
                    pass_message_history=worker_config.pass_message_history,
                    share_context=worker_config.share_context,
                )
            except KeyError as e:
                msg = f"Worker agent {worker_config.name!r} not found"
                raise ValueError(msg) from e

    @overload
    def get_agent[TDeps, TResult](
        self,
        agent: str | Agent[Any],
        *,
        deps: TDeps,
        return_type: type[TResult],
        model_override: str | None = None,
        session: SessionIdType | SessionQuery = None,
        environment_override: StrPath | Config | None = None,
    ) -> StructuredAgent[TDeps, TResult]: ...

    @overload
    def get_agent[TDeps](
        self,
        agent: str | Agent[Any],
        *,
        deps: TDeps,
        model_override: str | None = None,
        session: SessionIdType | SessionQuery = None,
        environment_override: StrPath | Config | None = None,
    ) -> Agent[TDeps]: ...

    @overload
    def get_agent[TResult](
        self,
        agent: str | Agent[Any],
        *,
        return_type: type[TResult],
        model_override: str | None = None,
        session: SessionIdType | SessionQuery = None,
        environment_override: StrPath | Config | None = None,
    ) -> StructuredAgent[Any, TResult]: ...

    @overload
    def get_agent(
        self,
        agent: str | Agent[Any],
        *,
        model_override: str | None = None,
        session: SessionIdType | SessionQuery = None,
        environment_override: StrPath | Config | None = None,
    ) -> Agent[Any]: ...

    def get_agent[TDeps, TResult](
        self,
        agent: str | Agent[Any],
        *,
        deps: TDeps | None = None,
        return_type: type[TResult] | None = None,
        model_override: str | None = None,
        session: SessionIdType | SessionQuery = None,
        environment_override: StrPath | Config | None = None,
    ) -> AnyAgent[TDeps, TResult]:
        """Get or wrap an agent.

        Args:
            agent: Either agent name or instance
            deps: Dependencies for the agent
            return_type: Optional type to make agent structured
            model_override: Optional model override
            session: Optional session ID or Session query to recover conversation
            environment_override: Optional environment configuration:
                - Path to environment file
                - Complete Config instance
                - None to use agent's default environment

        Returns:
            Either regular Agent or StructuredAgent depending on return_type

        Raises:
            KeyError: If agent name not found
            ValueError: If environment configuration is invalid
        """
        # Get base agent
        base = agent if isinstance(agent, Agent) else self.agents[agent]
        if deps is not None:
            base.context = base.context or AgentContext[TDeps].create_default(base.name)
            base.context.data = deps

        # Apply overrides
        if model_override:
            base.set_model(model_override)  # type: ignore

        if session:
            base.conversation.load_history_from_database(session=session)
        match environment_override:
            case Config():
                base.context.runtime = RuntimeConfig.from_config(environment_override)
            case str() | PathLike():
                base.context.runtime = RuntimeConfig.from_file(environment_override)

        # Wrap in StructuredAgent if return_type provided
        if return_type is not None:
            return StructuredAgent[Any, TResult](base, return_type)

        return base

    @classmethod
    @asynccontextmanager
    async def open[TDeps, TResult](
        cls,
        config_path: StrPath | AgentsManifest[TDeps, TResult] | None = None,
        *,
        agents: list[str] | None = None,
        connect_agents: bool = True,
        confirmation_callback: ConfirmationCallback | None = None,
    ) -> AsyncIterator[AgentPool]:
        """Open an agent pool from configuration.

        Args:
            config_path: Path to agent configuration file or manifest
            agents: Optional list of agent names to initialize
            connect_agents: Whether to set up forwarding connections
            confirmation_callback: Callback to confirm agent tool selection

        Yields:
            Configured agent pool
        """
        from llmling_agent.models import AgentsManifest

        match config_path:
            case None:
                manifest = AgentsManifest[Any, Any]()
            case str():
                manifest = AgentsManifest[Any, Any].from_file(config_path)
            case AgentsManifest():
                manifest = config_path
            case _:
                msg = f"Invalid config path: {config_path}"
                raise ValueError(msg)
        pool = cls(
            manifest,
            agents_to_load=agents,
            connect_agents=connect_agents,
            confirmation_callback=confirmation_callback,
        )
        try:
            async with pool:
                yield pool
        finally:
            await pool.cleanup()

    def list_agents(self) -> list[str]:
        """List available agent names."""
        return list(self.manifest.agents)

    def get_task(self, name: str) -> AgentTask[Any, Any]:
        return self._tasks[name]

    def register_task(self, name: str, task: AgentTask[Any, Any]):
        self._tasks.register(name, task)

    async def controlled_conversation(
        self,
        initial_agent: str | Agent[Any] = "starter",
        initial_prompt: str = "Hello!",
        decision_callback: DecisionCallback = interactive_controller,
    ):
        """Start a controlled conversation between agents.

        Args:
            initial_agent: Agent instance or name to start with
            initial_prompt: First message to start conversation
            decision_callback: Callback for routing decisions
        """
        from llmling_agent.delegation.agentgroup import Team

        group = Team(list(self.agents.values()))

        await group.run_controlled(
            prompt=initial_prompt,
            initial_agent=initial_agent,
            decision_callback=decision_callback,
        )

agents property

agents: EventedDict[str, AnyAgent[Any, Any]]

Get agents dict (backward compatibility).

__aenter__ async

__aenter__() -> Self

Enter async context and initialize all agents.

Source code in src/llmling_agent/delegation/pool.py
155
156
157
158
159
160
161
162
163
164
165
166
167
async def __aenter__(self) -> Self:
    """Enter async context and initialize all agents."""
    try:
        # Enter async context for all agents
        for agent in self.agents.values():
            await self.exit_stack.enter_async_context(agent)
    except Exception as e:
        await self.cleanup()
        msg = "Failed to initialize agent pool"
        logger.exception(msg, exc_info=e)
        raise RuntimeError(msg) from e
    else:
        return self

__aexit__ async

__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
)

Exit async context.

Source code in src/llmling_agent/delegation/pool.py
169
170
171
172
173
174
175
176
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
):
    """Exit async context."""
    await self.cleanup()

__init__

__init__(
    manifest: AgentsManifest,
    *,
    agents_to_load: list[str] | None = None,
    connect_agents: bool = True,
    confirmation_callback: ConfirmationCallback | None = None,
)

Initialize agent pool with immediate agent creation.

Parameters:

Name Type Description Default
manifest AgentsManifest

Agent configuration manifest

required
agents_to_load list[str] | None

Optional list of agent names to initialize If None, all agents from manifest are loaded

None
connect_agents bool

Whether to set up forwarding connections

True
confirmation_callback ConfirmationCallback | None

Handler callback for tool / step confirmations.

None
Source code in src/llmling_agent/delegation/pool.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def __init__(
    self,
    manifest: AgentsManifest,
    *,
    agents_to_load: list[str] | None = None,
    connect_agents: bool = True,
    confirmation_callback: ConfirmationCallback | None = None,
):
    """Initialize agent pool with immediate agent creation.

    Args:
        manifest: Agent configuration manifest
        agents_to_load: Optional list of agent names to initialize
                      If None, all agents from manifest are loaded
        connect_agents: Whether to set up forwarding connections
        confirmation_callback: Handler callback for tool / step confirmations.
    """
    super().__init__()
    from llmling_agent.models.context import AgentContext
    from llmling_agent.storage.manager import StorageManager

    self.manifest = manifest
    self._confirmation_callback = confirmation_callback
    self.exit_stack = AsyncExitStack()
    self.storage = StorageManager(manifest.storage)

    # Validate requested agents exist
    to_load = set(agents_to_load) if agents_to_load else set(manifest.agents)
    if invalid := (to_load - set(manifest.agents)):
        msg = f"Unknown agents: {', '.join(invalid)}"
        raise ValueError(msg)
    # register tasks
    self._tasks = TaskRegistry()
    # Register tasks from manifest
    for name, task in manifest.tasks.items():
        self._tasks.register(name, task)
    self.pool_talk = TeamTalk.from_agents(list(self.agents.values()))
    # Create requested agents immediately using sync initialization
    for name in to_load:
        config = manifest.agents[name]
        # Create runtime without async context
        cfg = config.get_config()
        runtime = RuntimeConfig.from_config(cfg)

        # Create context with config path and capabilities
        context = AgentContext[Any](
            agent_name=name,
            capabilities=config.capabilities,
            definition=self.manifest,
            config=config,
            pool=self,
            confirmation_callback=confirmation_callback,
        )

        # Create agent with runtime and context
        agent = Agent[Any](
            runtime=runtime,
            context=context,
            result_type=None,  # type: ignore[arg-type]
            model=config.model,  # type: ignore[arg-type]
            system_prompt=config.system_prompts,
            name=name,
            enable_db_logging=config.enable_db_logging,
        )
        self.register(name, agent)

    # Then set up worker relationships
    for name, config in manifest.agents.items():
        if name in self and config.workers:
            self.setup_agent_workers(self[name], config.workers)

    # Set up forwarding connections
    if connect_agents:
        self._connect_signals()

cleanup async

cleanup()

Clean up all agents.

Source code in src/llmling_agent/delegation/pool.py
178
179
180
181
182
183
184
async def cleanup(self):
    """Clean up all agents."""
    for agent in self.values():
        if agent.runtime:
            await agent.runtime.shutdown()
    await self.exit_stack.aclose()
    self.clear()

clone_agent async

clone_agent(
    agent: Agent[TDeps] | str,
    new_name: str | None = None,
    *,
    model_override: str | None = None,
    system_prompts: list[str] | None = None,
    template_context: dict[str, Any] | None = None,
) -> Agent[TDeps]

Create a copy of an agent.

Parameters:

Name Type Description Default
agent Agent[TDeps] | str

Agent instance or name to clone

required
new_name str | None

Optional name for the clone

None
model_override str | None

Optional different model

None
system_prompts list[str] | None

Optional different prompts

None
template_context dict[str, Any] | None

Variables for template rendering

None

Returns:

Type Description
Agent[TDeps]

The new agent instance

Source code in src/llmling_agent/delegation/pool.py
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
async def clone_agent[TDeps, TResult](
    self,
    agent: Agent[TDeps] | str,
    new_name: str | None = None,
    *,
    model_override: str | None = None,
    system_prompts: list[str] | None = None,
    template_context: dict[str, Any] | None = None,
) -> Agent[TDeps]:
    """Create a copy of an agent.

    Args:
        agent: Agent instance or name to clone
        new_name: Optional name for the clone
        model_override: Optional different model
        system_prompts: Optional different prompts
        template_context: Variables for template rendering

    Returns:
        The new agent instance
    """
    # Get original config
    if isinstance(agent, str):
        if agent not in self.manifest.agents:
            msg = f"Agent {agent} not found"
            raise KeyError(msg)
        config = self.manifest.agents[agent]
        original_agent: Agent[TDeps] = self.get_agent(agent)
    else:
        config = agent.context.config  # type: ignore
        original_agent = agent

    # Create new config
    new_config = config.model_copy(deep=True)

    # Apply overrides
    if model_override:
        new_config.model = model_override
    if system_prompts:
        new_config.system_prompts = system_prompts

    # Handle template rendering
    if template_context:
        new_config.system_prompts = new_config.render_system_prompts(template_context)

    # Create new agent with same runtime
    new_agent = Agent[TDeps](
        runtime=original_agent.runtime,
        context=original_agent.context,
        # result_type=original_agent.actual_type,
        model=new_config.model,  # type: ignore
        system_prompt=new_config.system_prompts,
        name=new_name or f"{config.name}_copy_{len(self.agents)}",
    )

    # Register in pool
    agent_name = new_agent.name
    self.manifest.agents[agent_name] = new_config
    self.agents[agent_name] = new_agent

    return new_agent

controlled_conversation async

controlled_conversation(
    initial_agent: str | Agent[Any] = "starter",
    initial_prompt: str = "Hello!",
    decision_callback: DecisionCallback = interactive_controller,
)

Start a controlled conversation between agents.

Parameters:

Name Type Description Default
initial_agent str | Agent[Any]

Agent instance or name to start with

'starter'
initial_prompt str

First message to start conversation

'Hello!'
decision_callback DecisionCallback

Callback for routing decisions

interactive_controller
Source code in src/llmling_agent/delegation/pool.py
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
async def controlled_conversation(
    self,
    initial_agent: str | Agent[Any] = "starter",
    initial_prompt: str = "Hello!",
    decision_callback: DecisionCallback = interactive_controller,
):
    """Start a controlled conversation between agents.

    Args:
        initial_agent: Agent instance or name to start with
        initial_prompt: First message to start conversation
        decision_callback: Callback for routing decisions
    """
    from llmling_agent.delegation.agentgroup import Team

    group = Team(list(self.agents.values()))

    await group.run_controlled(
        prompt=initial_prompt,
        initial_agent=initial_agent,
        decision_callback=decision_callback,
    )

create_agent async

create_agent(name: str, config: AgentConfig, *, temporary: bool = True) -> Agent[Any]

Create and register a new agent in the pool.

Parameters:

Name Type Description Default
name str

Name of the new agent

required
config AgentConfig

Agent configuration

required
temporary bool

If True, agent won't be added to manifest

True

Returns:

Type Description
Agent[Any]

Created and initialized agent

Raises:

Type Description
ValueError

If agent name already exists

RuntimeError

If agent initialization fails

Source code in src/llmling_agent/delegation/pool.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
async def create_agent(
    self,
    name: str,
    config: AgentConfig,
    *,
    temporary: bool = True,
) -> Agent[Any]:
    """Create and register a new agent in the pool.

    Args:
        name: Name of the new agent
        config: Agent configuration
        temporary: If True, agent won't be added to manifest

    Returns:
        Created and initialized agent

    Raises:
        ValueError: If agent name already exists
        RuntimeError: If agent initialization fails
    """
    from llmling_agent.models.context import AgentContext

    if name in self.agents:
        msg = f"Agent {name} already exists"
        raise ValueError(msg)

    try:
        # Create runtime from agent's config
        cfg = config.get_config()
        runtime = RuntimeConfig.from_config(cfg)

        # Create context with config path and capabilities
        context = AgentContext[Any](
            agent_name=name,
            capabilities=config.capabilities,
            definition=self.manifest,
            config=config,
            pool=self,
        )

        # Create agent with runtime and context
        agent = Agent[Any](
            agent_type=config.get_provider(),
            runtime=runtime,
            context=context,
            result_type=None,  # type: ignore[arg-type]
            model=config.model,  # type: ignore[arg-type]
            system_prompt=config.system_prompts,
            name=name,
        )

        # Enter agent's async context through pool's exit stack
        agent = await self.exit_stack.enter_async_context(agent)

        # Set up workers if defined
        if config.workers:
            self.setup_agent_workers(agent, config.workers)

        # Register in pool and optionally manifest
        self.agents[name] = agent
        if not temporary:
            self.manifest.agents[name] = config
    except Exception as e:
        msg = f"Failed to create agent {name}"
        raise RuntimeError(msg) from e
    else:
        return agent

create_group

create_group(
    agents: Sequence[str | AnyAgent[TDeps, Any]] | None = None,
    *,
    model_override: str | None = None,
    environment_override: StrPath | Config | None = None,
    shared_prompt: str | None = None,
    shared_deps: TDeps | None = None,
) -> Team[TDeps]

Create a group from agent names or instances.

Parameters:

Name Type Description Default
agents Sequence[str | AnyAgent[TDeps, Any]] | None

List of agent names or instances (all if None)

None
model_override str | None

Optional model to use for all agents

None
environment_override StrPath | Config | None

Optional environment for all agents

None
shared_prompt str | None

Optional prompt for all agents

None
shared_deps TDeps | None

Optional shared dependencies

None
Source code in src/llmling_agent/delegation/pool.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def create_group[TDeps](
    self,
    agents: Sequence[str | AnyAgent[TDeps, Any]] | None = None,
    *,
    model_override: str | None = None,
    environment_override: StrPath | Config | None = None,
    shared_prompt: str | None = None,
    shared_deps: TDeps | None = None,
) -> Team[TDeps]:
    """Create a group from agent names or instances.

    Args:
        agents: List of agent names or instances (all if None)
        model_override: Optional model to use for all agents
        environment_override: Optional environment for all agents
        shared_prompt: Optional prompt for all agents
        shared_deps: Optional shared dependencies
    """
    from llmling_agent.delegation.agentgroup import Team

    if agents is None:
        agents = list(self.agents.keys())

    # First resolve/configure agents
    resolved_agents: list[AnyAgent[TDeps, Any]] = []
    for agent in agents:
        if isinstance(agent, str):
            agent = self.get_agent(
                agent,
                model_override=model_override,
                environment_override=environment_override,
            )
        resolved_agents.append(agent)

    return Team(
        agents=resolved_agents,
        # pool=self,
        shared_prompt=shared_prompt,
        shared_deps=shared_deps,
    )

get_agent

get_agent(
    agent: str | Agent[Any],
    *,
    deps: TDeps,
    return_type: type[TResult],
    model_override: str | None = None,
    session: SessionIdType | SessionQuery = None,
    environment_override: StrPath | Config | None = None,
) -> StructuredAgent[TDeps, TResult]
get_agent(
    agent: str | Agent[Any],
    *,
    deps: TDeps,
    model_override: str | None = None,
    session: SessionIdType | SessionQuery = None,
    environment_override: StrPath | Config | None = None,
) -> Agent[TDeps]
get_agent(
    agent: str | Agent[Any],
    *,
    return_type: type[TResult],
    model_override: str | None = None,
    session: SessionIdType | SessionQuery = None,
    environment_override: StrPath | Config | None = None,
) -> StructuredAgent[Any, TResult]
get_agent(
    agent: str | Agent[Any],
    *,
    model_override: str | None = None,
    session: SessionIdType | SessionQuery = None,
    environment_override: StrPath | Config | None = None,
) -> Agent[Any]
get_agent(
    agent: str | Agent[Any],
    *,
    deps: TDeps | None = None,
    return_type: type[TResult] | None = None,
    model_override: str | None = None,
    session: SessionIdType | SessionQuery = None,
    environment_override: StrPath | Config | None = None,
) -> AnyAgent[TDeps, TResult]

Get or wrap an agent.

Parameters:

Name Type Description Default
agent str | Agent[Any]

Either agent name or instance

required
deps TDeps | None

Dependencies for the agent

None
return_type type[TResult] | None

Optional type to make agent structured

None
model_override str | None

Optional model override

None
session SessionIdType | SessionQuery

Optional session ID or Session query to recover conversation

None
environment_override StrPath | Config | None

Optional environment configuration: - Path to environment file - Complete Config instance - None to use agent's default environment

None

Returns:

Type Description
AnyAgent[TDeps, TResult]

Either regular Agent or StructuredAgent depending on return_type

Raises:

Type Description
KeyError

If agent name not found

ValueError

If environment configuration is invalid

Source code in src/llmling_agent/delegation/pool.py
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
def get_agent[TDeps, TResult](
    self,
    agent: str | Agent[Any],
    *,
    deps: TDeps | None = None,
    return_type: type[TResult] | None = None,
    model_override: str | None = None,
    session: SessionIdType | SessionQuery = None,
    environment_override: StrPath | Config | None = None,
) -> AnyAgent[TDeps, TResult]:
    """Get or wrap an agent.

    Args:
        agent: Either agent name or instance
        deps: Dependencies for the agent
        return_type: Optional type to make agent structured
        model_override: Optional model override
        session: Optional session ID or Session query to recover conversation
        environment_override: Optional environment configuration:
            - Path to environment file
            - Complete Config instance
            - None to use agent's default environment

    Returns:
        Either regular Agent or StructuredAgent depending on return_type

    Raises:
        KeyError: If agent name not found
        ValueError: If environment configuration is invalid
    """
    # Get base agent
    base = agent if isinstance(agent, Agent) else self.agents[agent]
    if deps is not None:
        base.context = base.context or AgentContext[TDeps].create_default(base.name)
        base.context.data = deps

    # Apply overrides
    if model_override:
        base.set_model(model_override)  # type: ignore

    if session:
        base.conversation.load_history_from_database(session=session)
    match environment_override:
        case Config():
            base.context.runtime = RuntimeConfig.from_config(environment_override)
        case str() | PathLike():
            base.context.runtime = RuntimeConfig.from_file(environment_override)

    # Wrap in StructuredAgent if return_type provided
    if return_type is not None:
        return StructuredAgent[Any, TResult](base, return_type)

    return base

list_agents

list_agents() -> list[str]

List available agent names.

Source code in src/llmling_agent/delegation/pool.py
600
601
602
def list_agents(self) -> list[str]:
    """List available agent names."""
    return list(self.manifest.agents)

open async classmethod

open(
    config_path: StrPath | AgentsManifest[TDeps, TResult] | None = None,
    *,
    agents: list[str] | None = None,
    connect_agents: bool = True,
    confirmation_callback: ConfirmationCallback | None = None,
) -> AsyncIterator[AgentPool]

Open an agent pool from configuration.

Parameters:

Name Type Description Default
config_path StrPath | AgentsManifest[TDeps, TResult] | None

Path to agent configuration file or manifest

None
agents list[str] | None

Optional list of agent names to initialize

None
connect_agents bool

Whether to set up forwarding connections

True
confirmation_callback ConfirmationCallback | None

Callback to confirm agent tool selection

None

Yields:

Type Description
AsyncIterator[AgentPool]

Configured agent pool

Source code in src/llmling_agent/delegation/pool.py
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
@classmethod
@asynccontextmanager
async def open[TDeps, TResult](
    cls,
    config_path: StrPath | AgentsManifest[TDeps, TResult] | None = None,
    *,
    agents: list[str] | None = None,
    connect_agents: bool = True,
    confirmation_callback: ConfirmationCallback | None = None,
) -> AsyncIterator[AgentPool]:
    """Open an agent pool from configuration.

    Args:
        config_path: Path to agent configuration file or manifest
        agents: Optional list of agent names to initialize
        connect_agents: Whether to set up forwarding connections
        confirmation_callback: Callback to confirm agent tool selection

    Yields:
        Configured agent pool
    """
    from llmling_agent.models import AgentsManifest

    match config_path:
        case None:
            manifest = AgentsManifest[Any, Any]()
        case str():
            manifest = AgentsManifest[Any, Any].from_file(config_path)
        case AgentsManifest():
            manifest = config_path
        case _:
            msg = f"Invalid config path: {config_path}"
            raise ValueError(msg)
    pool = cls(
        manifest,
        agents_to_load=agents,
        connect_agents=connect_agents,
        confirmation_callback=confirmation_callback,
    )
    try:
        async with pool:
            yield pool
    finally:
        await pool.cleanup()

setup_agent_workers

setup_agent_workers(agent: AnyAgent[Any, Any], workers: list[WorkerConfig])

Set up workers for an agent from configuration.

Source code in src/llmling_agent/delegation/pool.py
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
def setup_agent_workers(self, agent: AnyAgent[Any, Any], workers: list[WorkerConfig]):
    """Set up workers for an agent from configuration."""
    for worker_config in workers:
        try:
            worker = self.get_agent(worker_config.name)
            agent.register_worker(
                worker,
                name=worker_config.name,
                reset_history_on_run=worker_config.reset_history_on_run,
                pass_message_history=worker_config.pass_message_history,
                share_context=worker_config.share_context,
            )
        except KeyError as e:
            msg = f"Worker agent {worker_config.name!r} not found"
            raise ValueError(msg) from e

start_supervision

start_supervision() -> OptionalAwaitable[None]

Start supervision interface.

Can be called either synchronously or asynchronously:

Sync usage:

start_supervision(pool)

Async usage:

await start_supervision(pool)

Source code in src/llmling_agent/delegation/pool.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def start_supervision(self) -> OptionalAwaitable[None]:
    """Start supervision interface.

    Can be called either synchronously or asynchronously:

    # Sync usage:
    start_supervision(pool)

    # Async usage:
    await start_supervision(pool)
    """
    from llmling_agent.delegation.supervisor_ui import SupervisorApp

    app = SupervisorApp(self)
    if asyncio.get_event_loop().is_running():
        # We're in an async context
        return app.run_async()
    # We're in a sync context
    app.run()
    return None

Show source on GitHub