Skip to content

mcp_server

Class info

Classes

Name Children Inherits
MCPClient
llmling_agent.mcp_server.client
FastMCP-based client for communicating with MCP servers.

    🛈 DocStrings

    MCP server integration for LLMling agent.

    MCPClient

    FastMCP-based client for communicating with MCP servers.

    Source code in src/llmling_agent/mcp_server/client.py
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    282
    283
    284
    285
    286
    287
    288
    289
    290
    291
    292
    293
    294
    295
    296
    297
    298
    299
    300
    301
    302
    303
    304
    305
    306
    307
    308
    309
    310
    311
    312
    313
    314
    315
    316
    317
    318
    319
    320
    321
    322
    323
    324
    325
    326
    327
    328
    329
    330
    331
    332
    333
    334
    335
    336
    337
    338
    339
    340
    341
    class MCPClient:
        """FastMCP-based client for communicating with MCP servers."""
    
        def __init__(
            self,
            config: MCPServerConfig,
            elicitation_callback: ElicitationHandler | None = None,
            sampling_callback: ClientSamplingHandler | None = None,
            progress_handler: RichProgressCallback | None = None,
            message_handler: MessageHandlerT | MessageHandler | None = None,
            accessible_roots: list[str] | None = None,
            tool_change_callback: Callable[[], Awaitable[None]] | None = None,
            prompt_change_callback: Callable[[], Awaitable[None]] | None = None,
            resource_change_callback: Callable[[], Awaitable[None]] | None = None,
        ):
            self._elicitation_callback = elicitation_callback
            self.config = config
            self._sampling_callback = sampling_callback
            self._progress_handler = MultiEventHandler[RichProgressCallback](
                handlers=[progress_handler] if progress_handler else []
            )
            # Store message handler or mark for lazy creation
            self._message_handler = message_handler
            self._accessible_roots = accessible_roots or []
            self._tool_change_callback = tool_change_callback
            self._prompt_change_callback = prompt_change_callback
            self._resource_change_callback = resource_change_callback
            self._client = self._get_client(self.config)
    
        @property
        def connected(self) -> bool:
            """Check if client is connected by examining session state."""
            return self._client.is_connected()
    
        async def __aenter__(self) -> Self:
            """Enter context manager."""
            try:
                # First attempt with configured auth
                await self._client.__aenter__()
    
            except Exception as first_error:
                # OAuth fallback for HTTP/SSE if not already using OAuth
                if (
                    not isinstance(self.config, StdioMCPServerConfig)
                    and not self.config.auth.oauth
                ):
                    try:
                        with contextlib.suppress(Exception):
                            await self._client.__aexit__(None, None, None)
                        self._client = self._get_client(self.config, force_oauth=True)
                        await self._client.__aenter__()
                        logger.info("Connected with OAuth fallback")
                    except Exception:  # noqa: BLE001
                        raise first_error from None
                else:
                    raise
    
            return self
    
        async def __aexit__(self, *args: object):
            """Exit context manager and cleanup."""
            try:
                await self._client.__aexit__(None, None, None)
            except Exception as e:  # noqa: BLE001
                logger.warning("Error during FastMCP client cleanup", error=e)
    
        def get_resource_fs(self):
            """Get a filesystem for accessing MCP resources."""
            from upathtools.filesystems.mcp_fs import MCPFileSystem
    
            return MCPFileSystem(client=self._client)
    
        async def _log_handler(self, message: LogMessage) -> None:
            """Handle server log messages."""
            level = MCP_TO_LOGGING.get(message.level, logging.INFO)
            logger.log(level, "MCP Server: ", data=message.data)
    
        def _get_client(self, config: MCPServerConfig, force_oauth: bool = False):
            """Create FastMCP client based on config."""
            import fastmcp
            from fastmcp.client import SSETransport, StreamableHttpTransport
            from fastmcp.client.transports import StdioTransport
    
            transport: ClientTransport
            # Create transport based on config type
            match config:
                case StdioMCPServerConfig(command=command, args=args):
                    env = config.get_env_vars()
                    transport = StdioTransport(command=command, args=args, env=env)
                    oauth = False
                    if force_oauth:
                        msg = "OAuth is not supported for StdioMCPServerConfig"
                        raise ValueError(msg)
    
                case SSEMCPServerConfig(url=url, headers=headers, auth=auth):
                    transport = SSETransport(url=url, headers=headers)
                    oauth = auth.oauth
    
                case StreamableHTTPMCPServerConfig(url=url, headers=headers, auth=auth):
                    transport = StreamableHttpTransport(url=url, headers=headers)
                    oauth = auth.oauth
                case _:
                    msg = f"Unsupported server config type: {type(config)}"
                    raise ValueError(msg)
    
            # Create message handler if needed
            msg_handler = self._message_handler or MCPMessageHandler(
                self,
                self._tool_change_callback,
                self._prompt_change_callback,
                self._resource_change_callback,
            )
            return fastmcp.Client(
                transport,
                log_handler=self._log_handler,
                roots=self._accessible_roots,
                timeout=config.timeout,
                elicitation_handler=self._elicitation_callback,
                sampling_handler=self._sampling_callback,
                message_handler=msg_handler,
                auth="oauth" if (force_oauth or oauth) else None,
            )
    
        async def list_tools(self) -> list[MCPTool]:
            """Get available tools directly from the server."""
            if not self.connected:
                msg = "Not connected to MCP server"
                raise RuntimeError(msg)
    
            try:
                tools = await self._client.list_tools()
                logger.debug("Listed tools from MCP server", num_tools=len(tools))
            except Exception as e:  # noqa: BLE001
                logger.warning("Failed to list tools", error=e)
                return []
            else:
                return tools
    
        async def list_prompts(self) -> list[MCPPrompt]:
            """Get available prompts from the server."""
            if not self.connected:
                msg = "Not connected to MCP server"
                raise RuntimeError(msg)
    
            try:
                return await self._client.list_prompts()
            except Exception as e:  # noqa: BLE001
                logger.debug("Failed to list prompts", error=e)
                return []
    
        async def list_resources(self) -> list[MCPResource]:
            """Get available resources from the server."""
            if not self.connected:
                msg = "Not connected to MCP server"
                raise RuntimeError(msg)
    
            try:
                return await self._client.list_resources()
            except Exception as e:
                msg = f"Failed to list resources: {e}"
                raise RuntimeError(msg) from e
    
        async def get_prompt(
            self, name: str, arguments: dict[str, str] | None = None
        ) -> GetPromptResult:
            """Get a specific prompt's content."""
            if not self.connected:
                msg = "Not connected to MCP server"
                raise RuntimeError(msg)
    
            try:
                return await self._client.get_prompt_mcp(name, arguments)
            except Exception as e:
                msg = f"Failed to get prompt {name!r}: {e}"
                raise RuntimeError(msg) from e
    
        def convert_tool(self, tool: MCPTool) -> Tool:
            """Create a properly typed callable from MCP tool schema."""
    
            async def tool_callable(ctx: RunContext, **kwargs: Any) -> str | Any | ToolReturn:
                """Dynamically generated MCP tool wrapper."""
                # Filter out None values for optional params
                schema_props = tool.inputSchema.get("properties", {})
                required_props = set(tool.inputSchema.get("required", []))
                filtered_kwargs = {
                    k: v
                    for k, v in kwargs.items()
                    if k in required_props or (k in schema_props and v is not None)
                }
                return await self.call_tool(tool.name, ctx, filtered_kwargs)
    
            # Set proper signature and annotations with RunContext support
            schema = mcp_tool_to_fn_schema(tool)
            fn_schema = FunctionSchema.from_dict(schema)
            sig = fn_schema.to_python_signature()
            tool_callable.__signature__ = _create_tool_signature_with_context(sig)  # type: ignore
            annotations = fn_schema.get_annotations()
            annotations["ctx"] = RunContext
            # Update return annotation to support multiple types
            annotations["return"] = str | Any | ToolReturn  # type: ignore
            tool_callable.__annotations__ = annotations
            tool_callable.__name__ = tool.name
            tool_callable.__doc__ = tool.description or "No description provided."
            return Tool.from_callable(tool_callable, source="mcp")
    
        def _create_final_progress_handler(
            self, tool_name: str, tool_call_id: str, tool_input: dict[str, Any]
        ) -> ProgressHandler:
            """Create a FastMCP-compatible progress handler with baked-in context."""
    
            async def fastmcp_progress_handler(
                progress: float, total: float | None, message: str | None
            ):
                await self._progress_handler(
                    progress, total, message, tool_name, tool_call_id, tool_input
                )
    
            return fastmcp_progress_handler
    
        async def call_tool(
            self,
            name: str,
            run_context: RunContext,
            arguments: dict[str, Any] | None = None,
        ) -> ToolReturn | str | Any:
            """Call an MCP tool with full PydanticAI return type support."""
            if not self.connected:
                msg = "Not connected to MCP server"
                raise RuntimeError(msg)
    
            # Create progress handler if we have handler
            progress_handler = None
            if self._progress_handler:
                if run_context.tool_call_id and run_context.tool_name:
                    # Extract tool args from message history
                    tool_input = extract_tool_call_args(
                        run_context.messages, run_context.tool_call_id
                    )
                    progress_handler = self._create_final_progress_handler(
                        run_context.tool_name, run_context.tool_call_id, tool_input
                    )
                else:
                    # Fallback to using passed arguments (direct tool call)
                    progress_handler = self._create_final_progress_handler(
                        name, run_context.tool_call_id or "", arguments or {}
                    )
            try:
                result = await self._client.call_tool(
                    name, arguments, progress_handler=progress_handler
                )
                content = await self._convert_mcp_content(result.content)
                # Decision logic for return type
                match (result.data is not None, bool(content)):
                    case (True, True):  # Both structured data and rich content -> ToolReturn
                        return ToolReturn(return_value=result.data, content=content)
                    case (True, False):  # Only structured data -> return directly
                        return result.data
                    case (False, True):  # Only content -> ToolReturn with content
                        msg = "Tool executed successfully"
                        return ToolReturn(return_value=msg, content=content)
                    case (False, False):  # Fallback to text extraction
                        return extract_text_content(result.content)
                    case _:  # Handle unexpected cases
                        msg = f"Unexpected MCP content: {result.content}"
                        raise ValueError(msg)  # noqa: TRY301
            except Exception as e:
                msg = f"MCP tool call failed: {e}"
                raise RuntimeError(msg) from e
    
        async def _convert_mcp_content(
            self,
            mcp_content: Sequence[ContentBlock | TextResourceContents | BlobResourceContents],
        ) -> list[str | BinaryContent]:
            """Convert MCP content blocks to PydanticAI content types."""
            from llmling_agent.mcp_server.conversions import convert_mcp_content
    
            return await convert_mcp_content(mcp_content)
    

    connected property

    connected: bool
    

    Check if client is connected by examining session state.

    __aenter__ async

    __aenter__() -> Self
    

    Enter context manager.

    Source code in src/llmling_agent/mcp_server/client.py
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    async def __aenter__(self) -> Self:
        """Enter context manager."""
        try:
            # First attempt with configured auth
            await self._client.__aenter__()
    
        except Exception as first_error:
            # OAuth fallback for HTTP/SSE if not already using OAuth
            if (
                not isinstance(self.config, StdioMCPServerConfig)
                and not self.config.auth.oauth
            ):
                try:
                    with contextlib.suppress(Exception):
                        await self._client.__aexit__(None, None, None)
                    self._client = self._get_client(self.config, force_oauth=True)
                    await self._client.__aenter__()
                    logger.info("Connected with OAuth fallback")
                except Exception:  # noqa: BLE001
                    raise first_error from None
            else:
                raise
    
        return self
    

    __aexit__ async

    __aexit__(*args: object)
    

    Exit context manager and cleanup.

    Source code in src/llmling_agent/mcp_server/client.py
    124
    125
    126
    127
    128
    129
    async def __aexit__(self, *args: object):
        """Exit context manager and cleanup."""
        try:
            await self._client.__aexit__(None, None, None)
        except Exception as e:  # noqa: BLE001
            logger.warning("Error during FastMCP client cleanup", error=e)
    

    call_tool async

    call_tool(
        name: str, run_context: RunContext, arguments: dict[str, Any] | None = None
    ) -> ToolReturn | str | Any
    

    Call an MCP tool with full PydanticAI return type support.

    Source code in src/llmling_agent/mcp_server/client.py
    284
    285
    286
    287
    288
    289
    290
    291
    292
    293
    294
    295
    296
    297
    298
    299
    300
    301
    302
    303
    304
    305
    306
    307
    308
    309
    310
    311
    312
    313
    314
    315
    316
    317
    318
    319
    320
    321
    322
    323
    324
    325
    326
    327
    328
    329
    330
    331
    332
    async def call_tool(
        self,
        name: str,
        run_context: RunContext,
        arguments: dict[str, Any] | None = None,
    ) -> ToolReturn | str | Any:
        """Call an MCP tool with full PydanticAI return type support."""
        if not self.connected:
            msg = "Not connected to MCP server"
            raise RuntimeError(msg)
    
        # Create progress handler if we have handler
        progress_handler = None
        if self._progress_handler:
            if run_context.tool_call_id and run_context.tool_name:
                # Extract tool args from message history
                tool_input = extract_tool_call_args(
                    run_context.messages, run_context.tool_call_id
                )
                progress_handler = self._create_final_progress_handler(
                    run_context.tool_name, run_context.tool_call_id, tool_input
                )
            else:
                # Fallback to using passed arguments (direct tool call)
                progress_handler = self._create_final_progress_handler(
                    name, run_context.tool_call_id or "", arguments or {}
                )
        try:
            result = await self._client.call_tool(
                name, arguments, progress_handler=progress_handler
            )
            content = await self._convert_mcp_content(result.content)
            # Decision logic for return type
            match (result.data is not None, bool(content)):
                case (True, True):  # Both structured data and rich content -> ToolReturn
                    return ToolReturn(return_value=result.data, content=content)
                case (True, False):  # Only structured data -> return directly
                    return result.data
                case (False, True):  # Only content -> ToolReturn with content
                    msg = "Tool executed successfully"
                    return ToolReturn(return_value=msg, content=content)
                case (False, False):  # Fallback to text extraction
                    return extract_text_content(result.content)
                case _:  # Handle unexpected cases
                    msg = f"Unexpected MCP content: {result.content}"
                    raise ValueError(msg)  # noqa: TRY301
        except Exception as e:
            msg = f"MCP tool call failed: {e}"
            raise RuntimeError(msg) from e
    

    convert_tool

    convert_tool(tool: Tool) -> Tool
    

    Create a properly typed callable from MCP tool schema.

    Source code in src/llmling_agent/mcp_server/client.py
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    def convert_tool(self, tool: MCPTool) -> Tool:
        """Create a properly typed callable from MCP tool schema."""
    
        async def tool_callable(ctx: RunContext, **kwargs: Any) -> str | Any | ToolReturn:
            """Dynamically generated MCP tool wrapper."""
            # Filter out None values for optional params
            schema_props = tool.inputSchema.get("properties", {})
            required_props = set(tool.inputSchema.get("required", []))
            filtered_kwargs = {
                k: v
                for k, v in kwargs.items()
                if k in required_props or (k in schema_props and v is not None)
            }
            return await self.call_tool(tool.name, ctx, filtered_kwargs)
    
        # Set proper signature and annotations with RunContext support
        schema = mcp_tool_to_fn_schema(tool)
        fn_schema = FunctionSchema.from_dict(schema)
        sig = fn_schema.to_python_signature()
        tool_callable.__signature__ = _create_tool_signature_with_context(sig)  # type: ignore
        annotations = fn_schema.get_annotations()
        annotations["ctx"] = RunContext
        # Update return annotation to support multiple types
        annotations["return"] = str | Any | ToolReturn  # type: ignore
        tool_callable.__annotations__ = annotations
        tool_callable.__name__ = tool.name
        tool_callable.__doc__ = tool.description or "No description provided."
        return Tool.from_callable(tool_callable, source="mcp")
    

    get_prompt async

    get_prompt(name: str, arguments: dict[str, str] | None = None) -> GetPromptResult
    

    Get a specific prompt's content.

    Source code in src/llmling_agent/mcp_server/client.py
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    async def get_prompt(
        self, name: str, arguments: dict[str, str] | None = None
    ) -> GetPromptResult:
        """Get a specific prompt's content."""
        if not self.connected:
            msg = "Not connected to MCP server"
            raise RuntimeError(msg)
    
        try:
            return await self._client.get_prompt_mcp(name, arguments)
        except Exception as e:
            msg = f"Failed to get prompt {name!r}: {e}"
            raise RuntimeError(msg) from e
    

    get_resource_fs

    get_resource_fs()
    

    Get a filesystem for accessing MCP resources.

    Source code in src/llmling_agent/mcp_server/client.py
    131
    132
    133
    134
    135
    def get_resource_fs(self):
        """Get a filesystem for accessing MCP resources."""
        from upathtools.filesystems.mcp_fs import MCPFileSystem
    
        return MCPFileSystem(client=self._client)
    

    list_prompts async

    list_prompts() -> list[Prompt]
    

    Get available prompts from the server.

    Source code in src/llmling_agent/mcp_server/client.py
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    async def list_prompts(self) -> list[MCPPrompt]:
        """Get available prompts from the server."""
        if not self.connected:
            msg = "Not connected to MCP server"
            raise RuntimeError(msg)
    
        try:
            return await self._client.list_prompts()
        except Exception as e:  # noqa: BLE001
            logger.debug("Failed to list prompts", error=e)
            return []
    

    list_resources async

    list_resources() -> list[Resource]
    

    Get available resources from the server.

    Source code in src/llmling_agent/mcp_server/client.py
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    async def list_resources(self) -> list[MCPResource]:
        """Get available resources from the server."""
        if not self.connected:
            msg = "Not connected to MCP server"
            raise RuntimeError(msg)
    
        try:
            return await self._client.list_resources()
        except Exception as e:
            msg = f"Failed to list resources: {e}"
            raise RuntimeError(msg) from e
    

    list_tools async

    list_tools() -> list[Tool]
    

    Get available tools directly from the server.

    Source code in src/llmling_agent/mcp_server/client.py
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    async def list_tools(self) -> list[MCPTool]:
        """Get available tools directly from the server."""
        if not self.connected:
            msg = "Not connected to MCP server"
            raise RuntimeError(msg)
    
        try:
            tools = await self._client.list_tools()
            logger.debug("Listed tools from MCP server", num_tools=len(tools))
        except Exception as e:  # noqa: BLE001
            logger.warning("Failed to list tools", error=e)
            return []
        else:
            return tools