Skip to content

mcp_server

Class info

Classes

Name Children Inherits
MCPClient
llmling_agent.mcp_server.client
MCP client for communicating with MCP servers.

    🛈 DocStrings

    MCP server integration for LLMling agent.

    MCPClient

    MCP client for communicating with MCP servers.

    Source code in src/llmling_agent/mcp_server/client.py
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    class MCPClient:
        """MCP client for communicating with MCP servers."""
    
        def __init__(self, stdio_mode: bool = False):
            self.exit_stack = AsyncExitStack()
            self.session: ClientSession | None = None
            self._available_tools: list[Tool] = []
            self._old_stdout: TextIO | None = None
            self._stdio_mode = stdio_mode
    
        async def __aenter__(self) -> Self:
            """Enter context and redirect stdout if in stdio mode."""
            try:
                if self._stdio_mode:
                    self._old_stdout = sys.stdout
                    sys.stdout = sys.stderr
                    logger.info("Redirecting stdout for stdio MCP server")
            except Exception as e:
                msg = "Failed to enter MCP client context"
                logger.exception(msg, exc_info=e)
                raise RuntimeError(msg) from e
            return self
    
        async def __aexit__(
            self,
            exc_type: type[BaseException] | None,
            exc_val: BaseException | None,
            exc_tb: TracebackType | None,
        ):
            """Restore stdout if redirected and cleanup."""
            try:
                if self._old_stdout:
                    sys.stdout = self._old_stdout
                await self.cleanup()
            except RuntimeError as e:
                if "exit cancel scope in a different task" in str(e):
                    logger.warning("Ignoring known MCP cleanup issue: Task context mismatch")
                else:
                    raise
            except Exception:
                logger.exception("Error during MCP client cleanup")
                raise
    
        async def cleanup(self):
            """Clean up resources."""
            with suppress(RuntimeError) as cm:
                await self.exit_stack.aclose()
    
            if cm and cm.error and "exit cancel scope in a different task" in str(cm.error):
                logger.warning("Ignoring known MCP cleanup issue: Task context mismatch")
            elif cm and cm.error:
                raise cm.error
    
        async def connect(
            self,
            command: str,
            args: list[str],
            env: dict[str, str] | None = None,
            url: str | None = None,
        ):
            """Connect to an MCP server.
    
            Args:
                command: Command to run (for stdio servers)
                args: Command arguments (for stdio servers)
                env: Optional environment variables
                url: Server URL (for SSE servers)
            """
            from mcp import ClientSession, StdioServerParameters
            from mcp.client.stdio import stdio_client
    
            if url:
                # SSE connection - just a placeholder for now
                logger.info("SSE servers not yet implemented")
                self.session = None
                return
    
            # Stdio connection
            params = StdioServerParameters(command=command, args=args, env=env)
            stdio_transport = await self.exit_stack.enter_async_context(stdio_client(params))
            stdio, write = stdio_transport
            session = ClientSession(stdio, write)
            self.session = await self.exit_stack.enter_async_context(session)
            assert self.session
            init_result = await self.session.initialize()
            info = init_result.serverInfo
            # Get available tools
            result = await self.session.list_tools()
            self._available_tools = result.tools
            logger.info("Connected to MCP server %s (%s)", info.name, info.version)
            logger.info("Available tools: %s", len(self._available_tools))
    
        def get_tools(self) -> list[dict]:
            """Get tools in OpenAI function format."""
            return [
                {"type": "function", "function": mcp_tool_to_fn_schema(tool)}
                for tool in self._available_tools
            ]
    
        async def list_prompts(self) -> mcp.types.ListPromptsResult:
            """Get available prompts from the server."""
            if not self.session:
                msg = "Not connected to MCP server"
                raise RuntimeError(msg)
            return await self.session.list_prompts()
    
        async def list_resources(self) -> mcp.types.ListResourcesResult:
            """Get available resources from the server."""
            if not self.session:
                msg = "Not connected to MCP server"
                raise RuntimeError(msg)
            return await self.session.list_resources()
    
        async def get_prompt(self, name: str) -> mcp.types.GetPromptResult:
            """Get a specific prompt's content."""
            if not self.session:
                msg = "Not connected to MCP server"
                raise RuntimeError(msg)
            return await self.session.get_prompt(name)
    
        def create_tool_callable(self, tool: MCPTool) -> Callable[..., Awaitable[str]]:
            """Create a properly typed callable from MCP tool schema."""
            from py2openai.functionschema import FunctionSchema
    
            schema = mcp_tool_to_fn_schema(tool)
            fn_schema = FunctionSchema.from_dict(schema)
            sig = fn_schema.to_python_signature()
    
            async def tool_callable(**kwargs: Any) -> str:
                """Dynamically generated MCP tool wrapper."""
                return await self.call_tool(tool.name, kwargs)
    
            # Set proper signature and docstring
            tool_callable.__signature__ = sig  # type: ignore
            tool_callable.__annotations__ = fn_schema.get_annotations()
            tool_callable.__name__ = tool.name
            tool_callable.__doc__ = tool.description or "No description provided."
            return tool_callable
    
        async def call_tool(self, name: str, arguments: dict | None = None) -> str:
            """Call an MCP tool."""
            from mcp.types import EmbeddedResource, ImageContent
    
            if not self.session:
                msg = "Not connected to MCP server"
                raise RuntimeError(msg)
    
            try:
                result = await self.session.call_tool(name, arguments or {})
                if isinstance(result.content[0], EmbeddedResource | ImageContent):
                    msg = "Tool returned an embedded resource"
                    raise TypeError(msg)  # noqa: TRY301
                return result.content[0].text
            except Exception as e:
                msg = f"MCP tool call failed: {e}"
                raise RuntimeError(msg) from e
    

    __aenter__ async

    __aenter__() -> Self
    

    Enter context and redirect stdout if in stdio mode.

    Source code in src/llmling_agent/mcp_server/client.py
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    async def __aenter__(self) -> Self:
        """Enter context and redirect stdout if in stdio mode."""
        try:
            if self._stdio_mode:
                self._old_stdout = sys.stdout
                sys.stdout = sys.stderr
                logger.info("Redirecting stdout for stdio MCP server")
        except Exception as e:
            msg = "Failed to enter MCP client context"
            logger.exception(msg, exc_info=e)
            raise RuntimeError(msg) from e
        return self
    

    __aexit__ async

    __aexit__(
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    )
    

    Restore stdout if redirected and cleanup.

    Source code in src/llmling_agent/mcp_server/client.py
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ):
        """Restore stdout if redirected and cleanup."""
        try:
            if self._old_stdout:
                sys.stdout = self._old_stdout
            await self.cleanup()
        except RuntimeError as e:
            if "exit cancel scope in a different task" in str(e):
                logger.warning("Ignoring known MCP cleanup issue: Task context mismatch")
            else:
                raise
        except Exception:
            logger.exception("Error during MCP client cleanup")
            raise
    

    call_tool async

    call_tool(name: str, arguments: dict | None = None) -> str
    

    Call an MCP tool.

    Source code in src/llmling_agent/mcp_server/client.py
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    async def call_tool(self, name: str, arguments: dict | None = None) -> str:
        """Call an MCP tool."""
        from mcp.types import EmbeddedResource, ImageContent
    
        if not self.session:
            msg = "Not connected to MCP server"
            raise RuntimeError(msg)
    
        try:
            result = await self.session.call_tool(name, arguments or {})
            if isinstance(result.content[0], EmbeddedResource | ImageContent):
                msg = "Tool returned an embedded resource"
                raise TypeError(msg)  # noqa: TRY301
            return result.content[0].text
        except Exception as e:
            msg = f"MCP tool call failed: {e}"
            raise RuntimeError(msg) from e
    

    cleanup async

    cleanup()
    

    Clean up resources.

    Source code in src/llmling_agent/mcp_server/client.py
    80
    81
    82
    83
    84
    85
    86
    87
    88
    async def cleanup(self):
        """Clean up resources."""
        with suppress(RuntimeError) as cm:
            await self.exit_stack.aclose()
    
        if cm and cm.error and "exit cancel scope in a different task" in str(cm.error):
            logger.warning("Ignoring known MCP cleanup issue: Task context mismatch")
        elif cm and cm.error:
            raise cm.error
    

    connect async

    connect(
        command: str,
        args: list[str],
        env: dict[str, str] | None = None,
        url: str | None = None,
    )
    

    Connect to an MCP server.

    Parameters:

    Name Type Description Default
    command str

    Command to run (for stdio servers)

    required
    args list[str]

    Command arguments (for stdio servers)

    required
    env dict[str, str] | None

    Optional environment variables

    None
    url str | None

    Server URL (for SSE servers)

    None
    Source code in src/llmling_agent/mcp_server/client.py
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    async def connect(
        self,
        command: str,
        args: list[str],
        env: dict[str, str] | None = None,
        url: str | None = None,
    ):
        """Connect to an MCP server.
    
        Args:
            command: Command to run (for stdio servers)
            args: Command arguments (for stdio servers)
            env: Optional environment variables
            url: Server URL (for SSE servers)
        """
        from mcp import ClientSession, StdioServerParameters
        from mcp.client.stdio import stdio_client
    
        if url:
            # SSE connection - just a placeholder for now
            logger.info("SSE servers not yet implemented")
            self.session = None
            return
    
        # Stdio connection
        params = StdioServerParameters(command=command, args=args, env=env)
        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(params))
        stdio, write = stdio_transport
        session = ClientSession(stdio, write)
        self.session = await self.exit_stack.enter_async_context(session)
        assert self.session
        init_result = await self.session.initialize()
        info = init_result.serverInfo
        # Get available tools
        result = await self.session.list_tools()
        self._available_tools = result.tools
        logger.info("Connected to MCP server %s (%s)", info.name, info.version)
        logger.info("Available tools: %s", len(self._available_tools))
    

    create_tool_callable

    create_tool_callable(tool: Tool) -> Callable[..., Awaitable[str]]
    

    Create a properly typed callable from MCP tool schema.

    Source code in src/llmling_agent/mcp_server/client.py
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    def create_tool_callable(self, tool: MCPTool) -> Callable[..., Awaitable[str]]:
        """Create a properly typed callable from MCP tool schema."""
        from py2openai.functionschema import FunctionSchema
    
        schema = mcp_tool_to_fn_schema(tool)
        fn_schema = FunctionSchema.from_dict(schema)
        sig = fn_schema.to_python_signature()
    
        async def tool_callable(**kwargs: Any) -> str:
            """Dynamically generated MCP tool wrapper."""
            return await self.call_tool(tool.name, kwargs)
    
        # Set proper signature and docstring
        tool_callable.__signature__ = sig  # type: ignore
        tool_callable.__annotations__ = fn_schema.get_annotations()
        tool_callable.__name__ = tool.name
        tool_callable.__doc__ = tool.description or "No description provided."
        return tool_callable
    

    get_prompt async

    get_prompt(name: str) -> GetPromptResult
    

    Get a specific prompt's content.

    Source code in src/llmling_agent/mcp_server/client.py
    150
    151
    152
    153
    154
    155
    async def get_prompt(self, name: str) -> mcp.types.GetPromptResult:
        """Get a specific prompt's content."""
        if not self.session:
            msg = "Not connected to MCP server"
            raise RuntimeError(msg)
        return await self.session.get_prompt(name)
    

    get_tools

    get_tools() -> list[dict]
    

    Get tools in OpenAI function format.

    Source code in src/llmling_agent/mcp_server/client.py
    129
    130
    131
    132
    133
    134
    def get_tools(self) -> list[dict]:
        """Get tools in OpenAI function format."""
        return [
            {"type": "function", "function": mcp_tool_to_fn_schema(tool)}
            for tool in self._available_tools
        ]
    

    list_prompts async

    list_prompts() -> ListPromptsResult
    

    Get available prompts from the server.

    Source code in src/llmling_agent/mcp_server/client.py
    136
    137
    138
    139
    140
    141
    async def list_prompts(self) -> mcp.types.ListPromptsResult:
        """Get available prompts from the server."""
        if not self.session:
            msg = "Not connected to MCP server"
            raise RuntimeError(msg)
        return await self.session.list_prompts()
    

    list_resources async

    list_resources() -> ListResourcesResult
    

    Get available resources from the server.

    Source code in src/llmling_agent/mcp_server/client.py
    143
    144
    145
    146
    147
    148
    async def list_resources(self) -> mcp.types.ListResourcesResult:
        """Get available resources from the server."""
        if not self.session:
            msg = "Not connected to MCP server"
            raise RuntimeError(msg)
        return await self.session.list_resources()