Skip to content

AgentPoolView

🛈 DocStrings

User's view and control point for interacting with an agent in a pool.

This class provides a focused way to interact with one primary agent that is part of a larger agent pool. Through this view, users can: 1. Interact with the primary agent directly 2. Manage connections to other agents in the pool 3. Control tool availability and settings 4. Handle commands and responses

Think of it as looking at the agent pool through the lens of one specific agent, while still being able to utilize the pool's collaborative capabilities.

Source code in src/llmling_agent/chat_session/base.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
class AgentPoolView:
    """User's view and control point for interacting with an agent in a pool.

    This class provides a focused way to interact with one primary agent that is part
    of a larger agent pool. Through this view, users can:
    1. Interact with the primary agent directly
    2. Manage connections to other agents in the pool
    3. Control tool availability and settings
    4. Handle commands and responses

    Think of it as looking at the agent pool through the lens of one specific agent,
    while still being able to utilize the pool's collaborative capabilities.
    """

    @dataclass(frozen=True)
    class SessionReset:
        """Emitted when session is reset."""

        session_id: str
        previous_tools: dict[str, bool]
        new_tools: dict[str, bool]
        timestamp: datetime = field(default_factory=datetime.now)

    history_cleared = Signal(ConversationManager.HistoryCleared)
    session_reset = Signal(SessionReset)
    tool_added = Signal(str, ToolInfo)
    tool_removed = Signal(str)  # tool_name
    tool_changed = Signal(str, ToolInfo)  # name, new_info
    agent_connected = Signal(Agent)

    def __init__(
        self,
        agent: AnyAgent[Any, Any],
        *,
        pool: AgentPool | None = None,
        wait_chain: bool = True,
    ):
        """Initialize chat session.

        Args:
            agent: The LLMling agent to use
            pool: Optional agent pool for multi-agent interactions
            wait_chain: Whether to wait for chain completion
        """
        # Basic setup that doesn't need async
        self._agent = agent
        self._pool = pool
        self.wait_chain = wait_chain
        # forward ToolManager signals to ours
        self._agent.tools.events.added.connect(self.tool_added.emit)
        self._agent.tools.events.removed.connect(self.tool_removed.emit)
        self._agent.tools.events.changed.connect(self.tool_changed.emit)
        self._agent.conversation.history_cleared.connect(self.history_cleared.emit)
        self._initialized = False  # Track initialization state
        file_path = HISTORY_DIR / f"{agent.name}.history"
        self.commands = CommandStore(history_file=file_path, enable_system_commands=True)
        self.start_time = datetime.now()
        self._state = SessionState(current_model=self._agent.model_name)

    @classmethod
    async def create(
        cls,
        agent: Agent[Any],
        *,
        pool: AgentPool | None = None,
        wait_chain: bool = True,
    ) -> AgentPoolView:
        """Create and initialize a new agent pool view.

        Args:
            agent: The primary agent to interact with
            pool: Optional agent pool for multi-agent interactions
            wait_chain: Whether to wait for chain completion

        Returns:
            Initialized AgentPoolView
        """
        view = cls(agent, pool=pool, wait_chain=wait_chain)
        await view.initialize()
        return view

    @property
    def pool(self) -> AgentPool | None:
        """Get the agent pool if available."""
        return self._pool

    async def connect_to(self, target: str, wait: bool | None = None):
        """Connect to another agent.

        Args:
            target: Name of target agent
            wait: Override session's wait_chain setting

        Raises:
            ValueError: If target agent not found or pool not available
        """
        logger.debug("Connecting to %s (wait=%s)", target, wait)
        if not self._pool:
            msg = "No agent pool available"
            raise ValueError(msg)

        try:
            target_agent = self._pool.get_agent(target)
        except KeyError as e:
            msg = f"Target agent not found: {target}"
            raise ValueError(msg) from e

        self._agent.pass_results_to(target_agent)
        self.agent_connected.emit(target_agent)

        if wait is not None:
            self.wait_chain = wait

    def _ensure_initialized(self):
        """Check if session is initialized."""
        if not self._initialized:
            msg = "Session not initialized. Call initialize() first."
            raise RuntimeError(msg)

    async def initialize(self):
        """Initialize async resources and load data."""
        if self._initialized:
            return

        # Load command history
        await self.commands.initialize()
        for cmd in get_commands():
            self.commands.register_command(cmd)

        self._initialized = True
        logger.debug("Initialized chat session for agent %r", self._agent.name)

    async def cleanup(self):
        """Clean up session resources."""
        if self._pool:
            await self._agent.disconnect_all()

    def add_command(self, command: str):
        """Add command to history."""
        if not command.strip():
            return
        from llmling_agent.storage.models import CommandHistory

        id_ = str(self._agent.conversation.id)
        CommandHistory.log(agent_name=self._agent.name, session_id=id_, command=command)

    def get_commands(
        self, limit: int | None = None, current_session_only: bool = False
    ) -> list[str]:
        """Get command history ordered by newest first."""
        from llmling_agent.storage.models import CommandHistory

        return CommandHistory.get_commands(
            agent_name=self._agent.name,
            session_id=str(self._agent.conversation.id),
            limit=limit,
            current_session_only=current_session_only,
        )

    async def clear(self):
        """Clear chat history."""
        self._agent.conversation.clear()

    async def reset(self):
        """Reset session state."""
        old_tools = self.tools.list_tools()
        self._agent.conversation.clear()
        self.tools.reset_states()
        new_tools = self.tools.list_tools()

        event = self.SessionReset(
            session_id=str(self._agent.conversation.id),
            previous_tools=old_tools,
            new_tools=new_tools,
        )
        self.session_reset.emit(event)

    async def handle_command(
        self,
        command_str: str,
        output: OutputWriter,
        metadata: dict[str, Any] | None = None,
    ):
        """Handle a slash command.

        Args:
            command_str: Command string without leading slash
            output: Output writer implementation
            metadata: Optional interface-specific metadata
        """
        self._ensure_initialized()
        meta = metadata or {}
        ctx = self.commands.create_context(self, output_writer=output, metadata=meta)
        await self.commands.execute_command(command_str, ctx)

    async def send_slash_command(
        self,
        content: str,
        *,
        output: OutputWriter | None = None,
        metadata: dict[str, Any] | None = None,
    ) -> ChatMessage[str]:
        writer = output or DefaultOutputWriter()
        try:
            await self.handle_command(content[1:], output=writer, metadata=metadata)
            return ChatMessage(content="", role="system")
        except ExitCommandError:
            # Re-raise without wrapping in CommandError
            raise
        except CommandError as e:
            return ChatMessage(content=f"Command error: {e}", role="system")

    @overload
    async def send_message(
        self,
        content: str,
        *,
        stream: Literal[False] = False,
        output: OutputWriter | None = None,
        metadata: dict[str, Any] | None = None,
    ) -> ChatMessage[str]: ...

    @overload
    async def send_message(
        self,
        content: str,
        *,
        stream: Literal[True],
        output: OutputWriter | None = None,
        metadata: dict[str, Any] | None = None,
    ) -> AsyncIterator[ChatMessage[str]]: ...

    async def send_message(
        self,
        content: str,
        *,
        stream: bool = False,
        output: OutputWriter | None = None,
        metadata: dict[str, Any] | None = None,
    ) -> ChatMessage[str] | AsyncIterator[ChatMessage[str]]:
        """Send a message and get response(s)."""
        self._ensure_initialized()
        if not content.strip():
            msg = "Message cannot be empty"
            raise ValueError(msg)

        if content.startswith("/"):
            return await self.send_slash_command(
                content,
                output=output,
                metadata=metadata,
            )
        try:
            if stream:
                return self._stream_message(content)
            return await self._send_normal(content)

        except Exception as e:
            logger.exception("Error processing message")
            msg = f"Error processing message: {e}"
            raise ChatSessionConfigError(msg) from e

    async def _send_normal(self, content: str) -> ChatMessage[str]:
        """Send message and get single response."""
        result = await self._agent.run(content)
        text_message = result.to_text_message()

        # Update session state metrics
        self._state.message_count += 2  # User and assistant messages
        if text_message.cost_info:
            self._state.update_tokens(text_message)
            self._state.total_cost = float(text_message.cost_info.total_cost)
        if text_message.response_time:
            self._state.last_response_time = text_message.response_time

        # Add chain waiting if enabled
        if self.wait_chain and self._pool:
            await self._agent.wait_for_chain()

        return text_message

    async def _stream_message(self, content: str) -> AsyncIterator[ChatMessage[str]]:
        """Send message and stream responses."""
        async with self._agent.run_stream(content) as stream_result:
            # Stream intermediate chunks
            async for response in stream_result.stream():
                yield ChatMessage[str](content=str(response), role="assistant")

            # Final message with complete metrics after stream completes
            start_time = time.perf_counter()

            # Get usage info if available
            usage = stream_result.usage()
            cost_info = (
                await TokenCost.from_usage(
                    usage, self._agent.model_name, content, response
                )
                if usage and self._agent.model_name
                else None
            )

            # Create final status message with all metrics
            final_msg = ChatMessage[str](
                content="",  # Empty content for final status message
                role="assistant",
                name=self._agent.name,
                model=self._agent.model_name,
                message_id=str(uuid4()),
                cost_info=cost_info,
                response_time=time.perf_counter() - start_time,
            )

            # Update session state
            self._state.message_count += 2  # User and assistant messages
            self._state.update_tokens(final_msg)

            # Add chain waiting if enabled
            if self.wait_chain and self._pool:
                await self._agent.wait_for_chain()

            yield final_msg

    @property
    def tools(self) -> ToolManager:
        """Get current tool states."""
        return self._agent.tools

pool property

pool: AgentPool | None

Get the agent pool if available.

tools property

tools: ToolManager

Get current tool states.

SessionReset dataclass

Emitted when session is reset.

Source code in src/llmling_agent/chat_session/base.py
53
54
55
56
57
58
59
60
@dataclass(frozen=True)
class SessionReset:
    """Emitted when session is reset."""

    session_id: str
    previous_tools: dict[str, bool]
    new_tools: dict[str, bool]
    timestamp: datetime = field(default_factory=datetime.now)

__init__

__init__(
    agent: AnyAgent[Any, Any], *, pool: AgentPool | None = None, wait_chain: bool = True
)

Initialize chat session.

Parameters:

Name Type Description Default
agent AnyAgent[Any, Any]

The LLMling agent to use

required
pool AgentPool | None

Optional agent pool for multi-agent interactions

None
wait_chain bool

Whether to wait for chain completion

True
Source code in src/llmling_agent/chat_session/base.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def __init__(
    self,
    agent: AnyAgent[Any, Any],
    *,
    pool: AgentPool | None = None,
    wait_chain: bool = True,
):
    """Initialize chat session.

    Args:
        agent: The LLMling agent to use
        pool: Optional agent pool for multi-agent interactions
        wait_chain: Whether to wait for chain completion
    """
    # Basic setup that doesn't need async
    self._agent = agent
    self._pool = pool
    self.wait_chain = wait_chain
    # forward ToolManager signals to ours
    self._agent.tools.events.added.connect(self.tool_added.emit)
    self._agent.tools.events.removed.connect(self.tool_removed.emit)
    self._agent.tools.events.changed.connect(self.tool_changed.emit)
    self._agent.conversation.history_cleared.connect(self.history_cleared.emit)
    self._initialized = False  # Track initialization state
    file_path = HISTORY_DIR / f"{agent.name}.history"
    self.commands = CommandStore(history_file=file_path, enable_system_commands=True)
    self.start_time = datetime.now()
    self._state = SessionState(current_model=self._agent.model_name)

add_command

add_command(command: str)

Add command to history.

Source code in src/llmling_agent/chat_session/base.py
176
177
178
179
180
181
182
183
def add_command(self, command: str):
    """Add command to history."""
    if not command.strip():
        return
    from llmling_agent.storage.models import CommandHistory

    id_ = str(self._agent.conversation.id)
    CommandHistory.log(agent_name=self._agent.name, session_id=id_, command=command)

cleanup async

cleanup()

Clean up session resources.

Source code in src/llmling_agent/chat_session/base.py
171
172
173
174
async def cleanup(self):
    """Clean up session resources."""
    if self._pool:
        await self._agent.disconnect_all()

clear async

clear()

Clear chat history.

Source code in src/llmling_agent/chat_session/base.py
198
199
200
async def clear(self):
    """Clear chat history."""
    self._agent.conversation.clear()

connect_to async

connect_to(target: str, wait: bool | None = None)

Connect to another agent.

Parameters:

Name Type Description Default
target str

Name of target agent

required
wait bool | None

Override session's wait_chain setting

None

Raises:

Type Description
ValueError

If target agent not found or pool not available

Source code in src/llmling_agent/chat_session/base.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
async def connect_to(self, target: str, wait: bool | None = None):
    """Connect to another agent.

    Args:
        target: Name of target agent
        wait: Override session's wait_chain setting

    Raises:
        ValueError: If target agent not found or pool not available
    """
    logger.debug("Connecting to %s (wait=%s)", target, wait)
    if not self._pool:
        msg = "No agent pool available"
        raise ValueError(msg)

    try:
        target_agent = self._pool.get_agent(target)
    except KeyError as e:
        msg = f"Target agent not found: {target}"
        raise ValueError(msg) from e

    self._agent.pass_results_to(target_agent)
    self.agent_connected.emit(target_agent)

    if wait is not None:
        self.wait_chain = wait

create async classmethod

create(
    agent: Agent[Any], *, pool: AgentPool | None = None, wait_chain: bool = True
) -> AgentPoolView

Create and initialize a new agent pool view.

Parameters:

Name Type Description Default
agent Agent[Any]

The primary agent to interact with

required
pool AgentPool | None

Optional agent pool for multi-agent interactions

None
wait_chain bool

Whether to wait for chain completion

True

Returns:

Type Description
AgentPoolView

Initialized AgentPoolView

Source code in src/llmling_agent/chat_session/base.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
@classmethod
async def create(
    cls,
    agent: Agent[Any],
    *,
    pool: AgentPool | None = None,
    wait_chain: bool = True,
) -> AgentPoolView:
    """Create and initialize a new agent pool view.

    Args:
        agent: The primary agent to interact with
        pool: Optional agent pool for multi-agent interactions
        wait_chain: Whether to wait for chain completion

    Returns:
        Initialized AgentPoolView
    """
    view = cls(agent, pool=pool, wait_chain=wait_chain)
    await view.initialize()
    return view

get_commands

get_commands(limit: int | None = None, current_session_only: bool = False) -> list[str]

Get command history ordered by newest first.

Source code in src/llmling_agent/chat_session/base.py
185
186
187
188
189
190
191
192
193
194
195
196
def get_commands(
    self, limit: int | None = None, current_session_only: bool = False
) -> list[str]:
    """Get command history ordered by newest first."""
    from llmling_agent.storage.models import CommandHistory

    return CommandHistory.get_commands(
        agent_name=self._agent.name,
        session_id=str(self._agent.conversation.id),
        limit=limit,
        current_session_only=current_session_only,
    )

handle_command async

handle_command(
    command_str: str, output: OutputWriter, metadata: dict[str, Any] | None = None
)

Handle a slash command.

Parameters:

Name Type Description Default
command_str str

Command string without leading slash

required
output OutputWriter

Output writer implementation

required
metadata dict[str, Any] | None

Optional interface-specific metadata

None
Source code in src/llmling_agent/chat_session/base.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
async def handle_command(
    self,
    command_str: str,
    output: OutputWriter,
    metadata: dict[str, Any] | None = None,
):
    """Handle a slash command.

    Args:
        command_str: Command string without leading slash
        output: Output writer implementation
        metadata: Optional interface-specific metadata
    """
    self._ensure_initialized()
    meta = metadata or {}
    ctx = self.commands.create_context(self, output_writer=output, metadata=meta)
    await self.commands.execute_command(command_str, ctx)

initialize async

initialize()

Initialize async resources and load data.

Source code in src/llmling_agent/chat_session/base.py
158
159
160
161
162
163
164
165
166
167
168
169
async def initialize(self):
    """Initialize async resources and load data."""
    if self._initialized:
        return

    # Load command history
    await self.commands.initialize()
    for cmd in get_commands():
        self.commands.register_command(cmd)

    self._initialized = True
    logger.debug("Initialized chat session for agent %r", self._agent.name)

reset async

reset()

Reset session state.

Source code in src/llmling_agent/chat_session/base.py
202
203
204
205
206
207
208
209
210
211
212
213
214
async def reset(self):
    """Reset session state."""
    old_tools = self.tools.list_tools()
    self._agent.conversation.clear()
    self.tools.reset_states()
    new_tools = self.tools.list_tools()

    event = self.SessionReset(
        session_id=str(self._agent.conversation.id),
        previous_tools=old_tools,
        new_tools=new_tools,
    )
    self.session_reset.emit(event)

send_message async

send_message(
    content: str,
    *,
    stream: Literal[False] = False,
    output: OutputWriter | None = None,
    metadata: dict[str, Any] | None = None,
) -> ChatMessage[str]
send_message(
    content: str,
    *,
    stream: Literal[True],
    output: OutputWriter | None = None,
    metadata: dict[str, Any] | None = None,
) -> AsyncIterator[ChatMessage[str]]
send_message(
    content: str,
    *,
    stream: bool = False,
    output: OutputWriter | None = None,
    metadata: dict[str, Any] | None = None,
) -> ChatMessage[str] | AsyncIterator[ChatMessage[str]]

Send a message and get response(s).

Source code in src/llmling_agent/chat_session/base.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
async def send_message(
    self,
    content: str,
    *,
    stream: bool = False,
    output: OutputWriter | None = None,
    metadata: dict[str, Any] | None = None,
) -> ChatMessage[str] | AsyncIterator[ChatMessage[str]]:
    """Send a message and get response(s)."""
    self._ensure_initialized()
    if not content.strip():
        msg = "Message cannot be empty"
        raise ValueError(msg)

    if content.startswith("/"):
        return await self.send_slash_command(
            content,
            output=output,
            metadata=metadata,
        )
    try:
        if stream:
            return self._stream_message(content)
        return await self._send_normal(content)

    except Exception as e:
        logger.exception("Error processing message")
        msg = f"Error processing message: {e}"
        raise ChatSessionConfigError(msg) from e

Show source on GitHub