Skip to content

client

Class info

Classes

Name Children Inherits
MCPClient
llmling_agent.mcp_server.client
MCP client for communicating with MCP servers.

    🛈 DocStrings

    MCP client integration for LLMling agent.

    MCPClient

    MCP client for communicating with MCP servers.

    Source code in src/llmling_agent/mcp_server/client.py
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    282
    283
    284
    285
    286
    287
    288
    289
    290
    291
    292
    293
    294
    295
    296
    297
    298
    299
    300
    301
    302
    303
    304
    305
    306
    307
    308
    309
    310
    311
    312
    313
    314
    315
    316
    317
    318
    class MCPClient:
        """MCP client for communicating with MCP servers."""
    
        def __init__(
            self,
            transport_mode: TransportType = "stdio",
            elicitation_callback: Callable[
                [RequestContext, mcp.types.ElicitRequestParams],
                Awaitable[mcp.types.ElicitResult | mcp.types.ErrorData],
            ]
            | None = None,
            sampling_callback: Callable[
                [RequestContext, mcp.types.CreateMessageRequestParams],
                Awaitable[mcp.types.CreateMessageResult | mcp.types.ErrorData],
            ]
            | None = None,
            progress_handler: ProgressHandler | None = None,
            accessible_roots: list[str] | None = None,
        ):
            self.exit_stack = AsyncExitStack()
            self.session: ClientSession | None = None
            self._available_tools: list[Tool] = []
            self._old_stdout: TextIO | None = None
            self._transport_mode = transport_mode
            self._elicitation_callback = elicitation_callback
            self._sampling_callback = sampling_callback
            self._progress_handler = progress_handler
            self._accessible_roots = accessible_roots or []
    
        async def __aenter__(self) -> Self:
            """Enter context and redirect stdout if in stdio mode."""
            return self
    
        async def __aexit__(
            self,
            exc_type: type[BaseException] | None,
            exc_val: BaseException | None,
            exc_tb: TracebackType | None,
        ):
            """Restore stdout if redirected and cleanup."""
            try:
                await self.cleanup()
            except RuntimeError as e:
                if "exit cancel scope in a different task" in str(e):
                    logger.warning("Ignoring known MCP cleanup issue: Task context mismatch")
                else:
                    raise
            except Exception:
                logger.exception("Error during MCP client cleanup")
                raise
    
        async def cleanup(self):
            """Clean up resources."""
            with suppress(RuntimeError) as cm:
                await self.exit_stack.aclose()
    
            if cm and cm.error and "exit cancel scope in a different task" in str(cm.error):
                logger.warning("Ignoring known MCP cleanup issue: Task context mismatch")
            elif cm and cm.error:
                raise cm.error
    
        async def _default_elicitation_callback(
            self,
            context: RequestContext,
            params: mcp.types.ElicitRequestParams,
        ) -> mcp.types.ElicitResult | mcp.types.ErrorData:
            """Default elicitation callback that returns not supported."""
            import mcp
    
            return mcp.types.ErrorData(
                code=mcp.types.INVALID_REQUEST,
                message="Elicitation not supported",
            )
    
        async def connect(
            self,
            command: str,
            args: list[str],
            env: dict[str, str] | None = None,
            url: str | None = None,
        ):
            """Connect to an MCP server.
    
            Args:
                command: Command to run (for stdio servers)
                args: Command arguments (for stdio servers)
                env: Optional environment variables
                url: Server URL (for SSE servers)
            """
            from mcp import ClientSession, StdioServerParameters
            from mcp.client.stdio import stdio_client
    
            if url:
                # SSE connection - just a placeholder for now
                logger.info("SSE servers not yet implemented")
                self.session = None
                return
            command = shutil.which(command) or command
            # Stdio connection
            params = StdioServerParameters(command=command, args=args, env=env)
            stdio_transport = await self.exit_stack.enter_async_context(stdio_client(params))
            stdio, write = stdio_transport
    
            # Create a wrapper that matches the expected signature
            async def elicitation_wrapper(context, params):
                if self._elicitation_callback:
                    return await self._elicitation_callback(context, params)
                return await self._default_elicitation_callback(context, params)
    
            async def sampling_wrapper(
                context: RequestContext,
                params: mcp.types.CreateMessageRequestParams,
            ) -> mcp.types.CreateMessageResult | mcp.types.ErrorData:
                if self._sampling_callback:
                    return await self._sampling_callback(context, params)
                # If no callback provided, let MCP SDK handle with its default
                import mcp
    
                return mcp.types.ErrorData(
                    code=mcp.types.INVALID_REQUEST,
                    message="Sampling not supported",
                )
    
            async def list_roots_wrapper(
                context: RequestContext,
            ) -> mcp.types.ListRootsResult | mcp.types.ErrorData:
                """List accessible filesystem roots."""
                from pathlib import Path
    
                import mcp
    
                roots = []
                for root_path in self._accessible_roots:
                    try:
                        path = Path(root_path).resolve()
                        if path.exists():
                            from pydantic import FileUrl
    
                            file_url = FileUrl(path.as_uri())
                            roots.append(
                                mcp.types.Root(uri=file_url, name=path.name or str(path))
                            )
                    except (OSError, ValueError):
                        # Skip invalid paths or inaccessible directories
                        continue
    
                return mcp.types.ListRootsResult(roots=roots)
    
            session = ClientSession(
                stdio,
                write,
                elicitation_callback=elicitation_wrapper,
                sampling_callback=sampling_wrapper,
                list_roots_callback=list_roots_wrapper,
            )
            self.session = await self.exit_stack.enter_async_context(session)
            assert self.session
            init_result = await self.session.initialize()
            info = init_result.serverInfo
            # Get available tools
            result = await self.session.list_tools()
            self._available_tools = result.tools
            logger.info("Connected to MCP server %s (%s)", info.name, info.version)
            logger.info("Available tools: %s", len(self._available_tools))
    
        def get_tools(self) -> list[dict]:
            """Get tools in OpenAI function format."""
            return [
                {"type": "function", "function": mcp_tool_to_fn_schema(tool)}
                for tool in self._available_tools
            ]
    
        async def list_prompts(self) -> mcp.types.ListPromptsResult:
            """Get available prompts from the server."""
            if not self.session:
                msg = "Not connected to MCP server"
                raise RuntimeError(msg)
            return await self.session.list_prompts()
    
        async def list_resources(self) -> mcp.types.ListResourcesResult:
            """Get available resources from the server."""
            if not self.session:
                msg = "Not connected to MCP server"
                raise RuntimeError(msg)
            return await self.session.list_resources()
    
        async def get_prompt(
            self, name: str, arguments: dict[str, str] | None = None
        ) -> mcp.types.GetPromptResult:
            """Get a specific prompt's content."""
            if not self.session:
                msg = "Not connected to MCP server"
                raise RuntimeError(msg)
            return await self.session.get_prompt(name, arguments)
    
        def create_tool_callable(self, tool: MCPTool) -> Callable[..., Awaitable[str]]:
            """Create a properly typed callable from MCP tool schema."""
            from schemez.functionschema import FunctionSchema
    
            schema = mcp_tool_to_fn_schema(tool)
            fn_schema = FunctionSchema.from_dict(schema)
            sig = fn_schema.to_python_signature()
    
            async def tool_callable(ctx: RunContext, **kwargs: Any) -> str:
                """Dynamically generated MCP tool wrapper."""
                # Filter out None values for optional params to avoid MCP validation errors
                # Only include parameters that are either required or have non-None values
                schema_props = tool.inputSchema.get("properties", {})
                required_props = set(tool.inputSchema.get("required", []))
                filtered_kwargs = {
                    k: v
                    for k, v in kwargs.items()
                    if k in required_props or (k in schema_props and v is not None)
                }
                return await self.call_tool(
                    tool.name, filtered_kwargs, tool_call_id=ctx.tool_call_id
                )
    
            # Set proper signature and docstring
            tool_callable.__signature__ = sig  # type: ignore
            tool_callable.__annotations__ = fn_schema.get_annotations()
            tool_callable.__name__ = tool.name
            tool_callable.__doc__ = tool.description or "No description provided."
            return tool_callable
    
        async def call_tool(
            self,
            name: str,
            arguments: dict | None = None,
            tool_call_id: str | None = None,
        ) -> str:
            """Call an MCP tool with optional ACP progress bridging."""
            from mcp.types import TextContent, TextResourceContents
    
            if not self.session:
                msg = "Not connected to MCP server"
                raise RuntimeError(msg)
    
            # Create progress callback if handler and tool_call_id available
            progress_callback = None
            if self._progress_handler and tool_call_id:
                progress_callback = self._create_progress_callback(
                    name, tool_call_id, arguments or {}
                )
    
            try:
                result = await self.session.call_tool(
                    name, arguments or {}, progress_callback=progress_callback
                )
                if not isinstance(result.content[0], TextResourceContents | TextContent):
                    msg = "Tool returned a non-text response"
                    raise TypeError(msg)  # noqa: TRY301
                return result.content[0].text
            except Exception as e:
                msg = f"MCP tool call failed: {e}"
                raise RuntimeError(msg) from e
    
        def _create_progress_callback(
            self,
            tool_name: str,
            tool_call_id: str,
            tool_input: dict,
        ) -> ProgressFnT:
            """Create progress callback that uses the progress notification handler."""
    
            async def progress_callback(
                progress: float, total: float | None = None, message: str | None = None
            ) -> None:
                if not self._progress_handler:
                    return
    
                try:
                    await self._progress_handler(
                        tool_name,
                        tool_call_id,
                        tool_input,
                        progress,
                        total,
                        message,
                    )
                except Exception as e:  # noqa: BLE001
                    logger.warning("Progress notification handler failed: %s", e)
    
            return progress_callback
    

    __aenter__ async

    __aenter__() -> Self
    

    Enter context and redirect stdout if in stdio mode.

    Source code in src/llmling_agent/mcp_server/client.py
    64
    65
    66
    async def __aenter__(self) -> Self:
        """Enter context and redirect stdout if in stdio mode."""
        return self
    

    __aexit__ async

    __aexit__(
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    )
    

    Restore stdout if redirected and cleanup.

    Source code in src/llmling_agent/mcp_server/client.py
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ):
        """Restore stdout if redirected and cleanup."""
        try:
            await self.cleanup()
        except RuntimeError as e:
            if "exit cancel scope in a different task" in str(e):
                logger.warning("Ignoring known MCP cleanup issue: Task context mismatch")
            else:
                raise
        except Exception:
            logger.exception("Error during MCP client cleanup")
            raise
    

    call_tool async

    call_tool(
        name: str, arguments: dict | None = None, tool_call_id: str | None = None
    ) -> str
    

    Call an MCP tool with optional ACP progress bridging.

    Source code in src/llmling_agent/mcp_server/client.py
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    282
    283
    284
    285
    286
    287
    288
    289
    290
    async def call_tool(
        self,
        name: str,
        arguments: dict | None = None,
        tool_call_id: str | None = None,
    ) -> str:
        """Call an MCP tool with optional ACP progress bridging."""
        from mcp.types import TextContent, TextResourceContents
    
        if not self.session:
            msg = "Not connected to MCP server"
            raise RuntimeError(msg)
    
        # Create progress callback if handler and tool_call_id available
        progress_callback = None
        if self._progress_handler and tool_call_id:
            progress_callback = self._create_progress_callback(
                name, tool_call_id, arguments or {}
            )
    
        try:
            result = await self.session.call_tool(
                name, arguments or {}, progress_callback=progress_callback
            )
            if not isinstance(result.content[0], TextResourceContents | TextContent):
                msg = "Tool returned a non-text response"
                raise TypeError(msg)  # noqa: TRY301
            return result.content[0].text
        except Exception as e:
            msg = f"MCP tool call failed: {e}"
            raise RuntimeError(msg) from e
    

    cleanup async

    cleanup()
    

    Clean up resources.

    Source code in src/llmling_agent/mcp_server/client.py
    86
    87
    88
    89
    90
    91
    92
    93
    94
    async def cleanup(self):
        """Clean up resources."""
        with suppress(RuntimeError) as cm:
            await self.exit_stack.aclose()
    
        if cm and cm.error and "exit cancel scope in a different task" in str(cm.error):
            logger.warning("Ignoring known MCP cleanup issue: Task context mismatch")
        elif cm and cm.error:
            raise cm.error
    

    connect async

    connect(
        command: str,
        args: list[str],
        env: dict[str, str] | None = None,
        url: str | None = None,
    )
    

    Connect to an MCP server.

    Parameters:

    Name Type Description Default
    command str

    Command to run (for stdio servers)

    required
    args list[str]

    Command arguments (for stdio servers)

    required
    env dict[str, str] | None

    Optional environment variables

    None
    url str | None

    Server URL (for SSE servers)

    None
    Source code in src/llmling_agent/mcp_server/client.py
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    async def connect(
        self,
        command: str,
        args: list[str],
        env: dict[str, str] | None = None,
        url: str | None = None,
    ):
        """Connect to an MCP server.
    
        Args:
            command: Command to run (for stdio servers)
            args: Command arguments (for stdio servers)
            env: Optional environment variables
            url: Server URL (for SSE servers)
        """
        from mcp import ClientSession, StdioServerParameters
        from mcp.client.stdio import stdio_client
    
        if url:
            # SSE connection - just a placeholder for now
            logger.info("SSE servers not yet implemented")
            self.session = None
            return
        command = shutil.which(command) or command
        # Stdio connection
        params = StdioServerParameters(command=command, args=args, env=env)
        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(params))
        stdio, write = stdio_transport
    
        # Create a wrapper that matches the expected signature
        async def elicitation_wrapper(context, params):
            if self._elicitation_callback:
                return await self._elicitation_callback(context, params)
            return await self._default_elicitation_callback(context, params)
    
        async def sampling_wrapper(
            context: RequestContext,
            params: mcp.types.CreateMessageRequestParams,
        ) -> mcp.types.CreateMessageResult | mcp.types.ErrorData:
            if self._sampling_callback:
                return await self._sampling_callback(context, params)
            # If no callback provided, let MCP SDK handle with its default
            import mcp
    
            return mcp.types.ErrorData(
                code=mcp.types.INVALID_REQUEST,
                message="Sampling not supported",
            )
    
        async def list_roots_wrapper(
            context: RequestContext,
        ) -> mcp.types.ListRootsResult | mcp.types.ErrorData:
            """List accessible filesystem roots."""
            from pathlib import Path
    
            import mcp
    
            roots = []
            for root_path in self._accessible_roots:
                try:
                    path = Path(root_path).resolve()
                    if path.exists():
                        from pydantic import FileUrl
    
                        file_url = FileUrl(path.as_uri())
                        roots.append(
                            mcp.types.Root(uri=file_url, name=path.name or str(path))
                        )
                except (OSError, ValueError):
                    # Skip invalid paths or inaccessible directories
                    continue
    
            return mcp.types.ListRootsResult(roots=roots)
    
        session = ClientSession(
            stdio,
            write,
            elicitation_callback=elicitation_wrapper,
            sampling_callback=sampling_wrapper,
            list_roots_callback=list_roots_wrapper,
        )
        self.session = await self.exit_stack.enter_async_context(session)
        assert self.session
        init_result = await self.session.initialize()
        info = init_result.serverInfo
        # Get available tools
        result = await self.session.list_tools()
        self._available_tools = result.tools
        logger.info("Connected to MCP server %s (%s)", info.name, info.version)
        logger.info("Available tools: %s", len(self._available_tools))
    

    create_tool_callable

    create_tool_callable(tool: Tool) -> Callable[..., Awaitable[str]]
    

    Create a properly typed callable from MCP tool schema.

    Source code in src/llmling_agent/mcp_server/client.py
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    def create_tool_callable(self, tool: MCPTool) -> Callable[..., Awaitable[str]]:
        """Create a properly typed callable from MCP tool schema."""
        from schemez.functionschema import FunctionSchema
    
        schema = mcp_tool_to_fn_schema(tool)
        fn_schema = FunctionSchema.from_dict(schema)
        sig = fn_schema.to_python_signature()
    
        async def tool_callable(ctx: RunContext, **kwargs: Any) -> str:
            """Dynamically generated MCP tool wrapper."""
            # Filter out None values for optional params to avoid MCP validation errors
            # Only include parameters that are either required or have non-None values
            schema_props = tool.inputSchema.get("properties", {})
            required_props = set(tool.inputSchema.get("required", []))
            filtered_kwargs = {
                k: v
                for k, v in kwargs.items()
                if k in required_props or (k in schema_props and v is not None)
            }
            return await self.call_tool(
                tool.name, filtered_kwargs, tool_call_id=ctx.tool_call_id
            )
    
        # Set proper signature and docstring
        tool_callable.__signature__ = sig  # type: ignore
        tool_callable.__annotations__ = fn_schema.get_annotations()
        tool_callable.__name__ = tool.name
        tool_callable.__doc__ = tool.description or "No description provided."
        return tool_callable
    

    get_prompt async

    get_prompt(name: str, arguments: dict[str, str] | None = None) -> GetPromptResult
    

    Get a specific prompt's content.

    Source code in src/llmling_agent/mcp_server/client.py
    221
    222
    223
    224
    225
    226
    227
    228
    async def get_prompt(
        self, name: str, arguments: dict[str, str] | None = None
    ) -> mcp.types.GetPromptResult:
        """Get a specific prompt's content."""
        if not self.session:
            msg = "Not connected to MCP server"
            raise RuntimeError(msg)
        return await self.session.get_prompt(name, arguments)
    

    get_tools

    get_tools() -> list[dict]
    

    Get tools in OpenAI function format.

    Source code in src/llmling_agent/mcp_server/client.py
    200
    201
    202
    203
    204
    205
    def get_tools(self) -> list[dict]:
        """Get tools in OpenAI function format."""
        return [
            {"type": "function", "function": mcp_tool_to_fn_schema(tool)}
            for tool in self._available_tools
        ]
    

    list_prompts async

    list_prompts() -> ListPromptsResult
    

    Get available prompts from the server.

    Source code in src/llmling_agent/mcp_server/client.py
    207
    208
    209
    210
    211
    212
    async def list_prompts(self) -> mcp.types.ListPromptsResult:
        """Get available prompts from the server."""
        if not self.session:
            msg = "Not connected to MCP server"
            raise RuntimeError(msg)
        return await self.session.list_prompts()
    

    list_resources async

    list_resources() -> ListResourcesResult
    

    Get available resources from the server.

    Source code in src/llmling_agent/mcp_server/client.py
    214
    215
    216
    217
    218
    219
    async def list_resources(self) -> mcp.types.ListResourcesResult:
        """Get available resources from the server."""
        if not self.session:
            msg = "Not connected to MCP server"
            raise RuntimeError(msg)
        return await self.session.list_resources()
    

    mcp_tool_to_fn_schema

    mcp_tool_to_fn_schema(tool: Tool) -> dict[str, Any]
    

    Convert MCP tool to OpenAI function schema.

    Source code in src/llmling_agent/mcp_server/client.py
    29
    30
    31
    32
    def mcp_tool_to_fn_schema(tool: MCPTool) -> dict[str, Any]:
        """Convert MCP tool to OpenAI function schema."""
        desc = tool.description or "No description provided"
        return {"name": tool.name, "description": desc, "parameters": tool.inputSchema}