Skip to content

manager

Class info

Classes

Name Children Inherits
MCPClient
llmling_agent.mcp_server.client
MCP client for communicating with MCP servers.
    MCPManager
    llmling_agent.mcp_server.manager
    Manages MCP server connections and tools.
      ResourceInfo
      llmling_agent.models.resources
      Information about an available resource.
        ResourceProvider
        llmling_agent.resource_providers.base
        Base class for resource providers.
        SSEMCPServer
        llmling_agent.models.mcp_server
        MCP server using Server-Sent Events transport.
          StdioMCPServer
          llmling_agent.models.mcp_server
          MCP server started via stdio.

            🛈 DocStrings

            Tool management for LLMling agents.

            MCPManager

            Bases: ResourceProvider

            Manages MCP server connections and tools.

            Source code in src/llmling_agent/mcp_server/manager.py
             51
             52
             53
             54
             55
             56
             57
             58
             59
             60
             61
             62
             63
             64
             65
             66
             67
             68
             69
             70
             71
             72
             73
             74
             75
             76
             77
             78
             79
             80
             81
             82
             83
             84
             85
             86
             87
             88
             89
             90
             91
             92
             93
             94
             95
             96
             97
             98
             99
            100
            101
            102
            103
            104
            105
            106
            107
            108
            109
            110
            111
            112
            113
            114
            115
            116
            117
            118
            119
            120
            121
            122
            123
            124
            125
            126
            127
            128
            129
            130
            131
            132
            133
            134
            135
            136
            137
            138
            139
            140
            141
            142
            143
            144
            145
            146
            147
            148
            149
            150
            151
            152
            153
            154
            155
            156
            157
            158
            159
            160
            161
            162
            163
            164
            165
            166
            167
            168
            169
            170
            171
            172
            173
            174
            175
            176
            177
            178
            179
            180
            181
            182
            183
            184
            185
            186
            187
            188
            189
            190
            191
            192
            193
            194
            195
            196
            197
            198
            199
            200
            class MCPManager(ResourceProvider):
                """Manages MCP server connections and tools."""
            
                def __init__(
                    self,
                    name: str = "mcp",
                    owner: str | None = None,
                    servers: Sequence[MCPServerConfig | str] | None = None,
                    context: NodeContext | None = None,
                ):
                    super().__init__(name, owner=owner)
                    self.servers: list[MCPServerConfig] = []
                    for server in servers or []:
                        self.add_server_config(server)
                    self.context = context
                    self.clients: dict[str, MCPClient] = {}
                    self.exit_stack = AsyncExitStack()
            
                @property
                def requires_async(self) -> bool:
                    return True
            
                def add_server_config(self, server: MCPServerConfig | str):
                    """Add a new MCP server to the manager."""
                    server = StdioMCPServer.from_string(server) if isinstance(server, str) else server
                    self.servers.append(server)
            
                def __repr__(self) -> str:
                    return f"MCPManager({self.servers!r})"
            
                async def __aenter__(self) -> Self:
                    try:
                        # Setup directly provided servers
                        for server in self.servers:
                            await self.setup_server(server)
            
                        # Setup servers from context if available
                        if self.context and self.context.config and self.context.config.mcp_servers:
                            for server in self.context.config.get_mcp_servers():
                                await self.setup_server(server)
            
                    except Exception as e:
                        # Clean up in case of error
                        await self.__aexit__(type(e), e, e.__traceback__)
                        msg = "Failed to initialize MCP manager"
                        raise RuntimeError(msg) from e
            
                    return self
            
                async def __aexit__(self, *exc):
                    await self.cleanup()
            
                async def setup_server(self, config: MCPServerConfig):
                    """Set up a single MCP server connection."""
                    if not config.enabled:
                        return
                    env = config.get_env_vars()
                    match config:
                        case StdioMCPServer():
                            client = MCPClient(stdio_mode=True)
                            client = await self.exit_stack.enter_async_context(client)
                            await client.connect(config.command, args=config.args, env=env)
                            client_id = f"{config.command}_{' '.join(config.args)}"
                        case SSEMCPServer():
                            client = MCPClient(stdio_mode=False)
                            client = await self.exit_stack.enter_async_context(client)
                            await client.connect("", [], url=config.url, env=env)
                            client_id = f"sse_{config.url}"
            
                    self.clients[client_id] = client
            
                async def get_tools(self) -> list[ToolInfo]:
                    """Get all tools from all connected servers."""
                    from llmling_agent.tools.base import ToolInfo
            
                    tools: list[ToolInfo] = []
                    for client in self.clients.values():
                        for tool in client._available_tools:
                            try:
                                fn = client.create_tool_callable(tool)
                                meta = {"mcp_tool": tool.name}
                                tool_info = ToolInfo.from_callable(fn, source="mcp", metadata=meta)
                                tools.append(tool_info)
                                logger.debug("Registered MCP tool: %s", tool.name)
                            except Exception:
                                msg = "Failed to create tool from MCP tool: %s"
                                logger.exception(msg, tool.name)
                                continue
            
                    return tools
            
                async def list_prompts(self) -> list[StaticPrompt]:
                    """Get all available prompts from MCP servers."""
                    prompts = []
                    for client in self.clients.values():
                        try:
                            result = await client.list_prompts()
                        except Exception:
                            logger.exception("Failed to get prompts from MCP server")
                        for prompt in result.prompts:
                            try:
                                converted = await convert_mcp_prompt(client, prompt)
                                prompts.append(converted)
                            except Exception:
                                logger.exception("Failed to convert prompt: %s", prompt.name)
                    return prompts
            
                async def list_resources(self) -> list[ResourceInfo]:
                    """Get all available resources from MCP servers."""
                    resources = []
                    for client in self.clients.values():
                        try:
                            result = await client.list_resources()
                        except Exception:
                            logger.exception("Failed to get resources from MCP server")
                        for resource in result.resources:
                            try:
                                converted = await convert_mcp_resource(resource)
                                resources.append(converted)
                            except Exception:
                                logger.exception("Failed to convert resource: %s", resource.name)
                    return resources
            
                async def cleanup(self):
                    """Clean up all MCP connections."""
                    try:
                        try:
                            # Clean up exit stack (which includes MCP clients)
                            await self.exit_stack.aclose()
                        except RuntimeError as e:
                            if "different task" in str(e):
                                # Handle task context mismatch
                                current_task = asyncio.current_task()
                                if current_task:
                                    loop = asyncio.get_running_loop()
                                    await loop.create_task(self.exit_stack.aclose())
                            else:
                                raise
            
                        self.clients.clear()
            
                    except Exception as e:
                        msg = "Error during MCP manager cleanup"
                        logger.exception(msg, exc_info=e)
                        raise RuntimeError(msg) from e
            
                @property
                def active_servers(self) -> list[str]:
                    """Get IDs of active servers."""
                    return list(self.clients)
            

            active_servers property

            active_servers: list[str]
            

            Get IDs of active servers.

            add_server_config

            add_server_config(server: MCPServerConfig | str)
            

            Add a new MCP server to the manager.

            Source code in src/llmling_agent/mcp_server/manager.py
            73
            74
            75
            76
            def add_server_config(self, server: MCPServerConfig | str):
                """Add a new MCP server to the manager."""
                server = StdioMCPServer.from_string(server) if isinstance(server, str) else server
                self.servers.append(server)
            

            cleanup async

            cleanup()
            

            Clean up all MCP connections.

            Source code in src/llmling_agent/mcp_server/manager.py
            174
            175
            176
            177
            178
            179
            180
            181
            182
            183
            184
            185
            186
            187
            188
            189
            190
            191
            192
            193
            194
            195
            async def cleanup(self):
                """Clean up all MCP connections."""
                try:
                    try:
                        # Clean up exit stack (which includes MCP clients)
                        await self.exit_stack.aclose()
                    except RuntimeError as e:
                        if "different task" in str(e):
                            # Handle task context mismatch
                            current_task = asyncio.current_task()
                            if current_task:
                                loop = asyncio.get_running_loop()
                                await loop.create_task(self.exit_stack.aclose())
                        else:
                            raise
            
                    self.clients.clear()
            
                except Exception as e:
                    msg = "Error during MCP manager cleanup"
                    logger.exception(msg, exc_info=e)
                    raise RuntimeError(msg) from e
            

            get_tools async

            get_tools() -> list[ToolInfo]
            

            Get all tools from all connected servers.

            Source code in src/llmling_agent/mcp_server/manager.py
            122
            123
            124
            125
            126
            127
            128
            129
            130
            131
            132
            133
            134
            135
            136
            137
            138
            139
            140
            async def get_tools(self) -> list[ToolInfo]:
                """Get all tools from all connected servers."""
                from llmling_agent.tools.base import ToolInfo
            
                tools: list[ToolInfo] = []
                for client in self.clients.values():
                    for tool in client._available_tools:
                        try:
                            fn = client.create_tool_callable(tool)
                            meta = {"mcp_tool": tool.name}
                            tool_info = ToolInfo.from_callable(fn, source="mcp", metadata=meta)
                            tools.append(tool_info)
                            logger.debug("Registered MCP tool: %s", tool.name)
                        except Exception:
                            msg = "Failed to create tool from MCP tool: %s"
                            logger.exception(msg, tool.name)
                            continue
            
                return tools
            

            list_prompts async

            list_prompts() -> list[StaticPrompt]
            

            Get all available prompts from MCP servers.

            Source code in src/llmling_agent/mcp_server/manager.py
            142
            143
            144
            145
            146
            147
            148
            149
            150
            151
            152
            153
            154
            155
            156
            async def list_prompts(self) -> list[StaticPrompt]:
                """Get all available prompts from MCP servers."""
                prompts = []
                for client in self.clients.values():
                    try:
                        result = await client.list_prompts()
                    except Exception:
                        logger.exception("Failed to get prompts from MCP server")
                    for prompt in result.prompts:
                        try:
                            converted = await convert_mcp_prompt(client, prompt)
                            prompts.append(converted)
                        except Exception:
                            logger.exception("Failed to convert prompt: %s", prompt.name)
                return prompts
            

            list_resources async

            list_resources() -> list[ResourceInfo]
            

            Get all available resources from MCP servers.

            Source code in src/llmling_agent/mcp_server/manager.py
            158
            159
            160
            161
            162
            163
            164
            165
            166
            167
            168
            169
            170
            171
            172
            async def list_resources(self) -> list[ResourceInfo]:
                """Get all available resources from MCP servers."""
                resources = []
                for client in self.clients.values():
                    try:
                        result = await client.list_resources()
                    except Exception:
                        logger.exception("Failed to get resources from MCP server")
                    for resource in result.resources:
                        try:
                            converted = await convert_mcp_resource(resource)
                            resources.append(converted)
                        except Exception:
                            logger.exception("Failed to convert resource: %s", resource.name)
                return resources
            

            setup_server async

            setup_server(config: MCPServerConfig)
            

            Set up a single MCP server connection.

            Source code in src/llmling_agent/mcp_server/manager.py
            103
            104
            105
            106
            107
            108
            109
            110
            111
            112
            113
            114
            115
            116
            117
            118
            119
            120
            async def setup_server(self, config: MCPServerConfig):
                """Set up a single MCP server connection."""
                if not config.enabled:
                    return
                env = config.get_env_vars()
                match config:
                    case StdioMCPServer():
                        client = MCPClient(stdio_mode=True)
                        client = await self.exit_stack.enter_async_context(client)
                        await client.connect(config.command, args=config.args, env=env)
                        client_id = f"{config.command}_{' '.join(config.args)}"
                    case SSEMCPServer():
                        client = MCPClient(stdio_mode=False)
                        client = await self.exit_stack.enter_async_context(client)
                        await client.connect("", [], url=config.url, env=env)
                        client_id = f"sse_{config.url}"
            
                self.clients[client_id] = client
            

            convert_mcp_prompt async

            convert_mcp_prompt(client: MCPClient, prompt: Prompt) -> StaticPrompt
            

            Convert MCP prompt to StaticPrompt.

            Source code in src/llmling_agent/mcp_server/manager.py
            30
            31
            32
            33
            34
            35
            36
            37
            38
            39
            40
            41
            async def convert_mcp_prompt(client: MCPClient, prompt: MCPPrompt) -> StaticPrompt:
                """Convert MCP prompt to StaticPrompt."""
                from mcp.types import EmbeddedResource, ImageContent
            
                result = await client.get_prompt(prompt.name)
                messages = [
                    PromptMessage(role="system", content=message.content.text)
                    for message in result.messages
                    if not isinstance(message.content, EmbeddedResource | ImageContent)
                ]
                desc = prompt.description or "No description provided"
                return StaticPrompt(name=prompt.name, description=desc, messages=messages)
            

            convert_mcp_resource async

            convert_mcp_resource(resource: Resource) -> ResourceInfo
            

            Convert MCP resource to ResourceInfo.

            Source code in src/llmling_agent/mcp_server/manager.py
            44
            45
            46
            47
            48
            async def convert_mcp_resource(resource: MCPResource) -> ResourceInfo:
                """Convert MCP resource to ResourceInfo."""
                return ResourceInfo(
                    name=resource.name, uri=str(resource.uri), description=resource.description
                )