Skip to content

registry

Class info

Classes

Name Children Inherits
MCPServer
llmling_agent.mcp_server.registry
Represents a running MCP server instance.
    SSEMCPServerConfig
    llmling_agent_config.mcp_server
    MCP server using Server-Sent Events transport.
      • BaseMCPServerConfig
      ServerRegistry
      llmling_agent.mcp_server.registry
      Global registry for MCP server configurations and instances.
        StdioMCPServerConfig
        llmling_agent_config.mcp_server
        MCP server started via stdio.
          • BaseMCPServerConfig
          StreamableHTTPMCPServerConfig
          llmling_agent_config.mcp_server
          MCP server using StreamableHttp.
            • BaseMCPServerConfig

            🛈 DocStrings

            Global registry for MCP server instances.

            MCPServer

            Represents a running MCP server instance.

            Source code in src/llmling_agent/mcp_server/registry.py
             28
             29
             30
             31
             32
             33
             34
             35
             36
             37
             38
             39
             40
             41
             42
             43
             44
             45
             46
             47
             48
             49
             50
             51
             52
             53
             54
             55
             56
             57
             58
             59
             60
             61
             62
             63
             64
             65
             66
             67
             68
             69
             70
             71
             72
             73
             74
             75
             76
             77
             78
             79
             80
             81
             82
             83
             84
             85
             86
             87
             88
             89
             90
             91
             92
             93
             94
             95
             96
             97
             98
             99
            100
            101
            102
            103
            104
            105
            106
            107
            108
            109
            110
            111
            112
            113
            114
            115
            116
            117
            118
            119
            120
            121
            122
            123
            124
            125
            126
            127
            class MCPServer:
                """Represents a running MCP server instance."""
            
                def __init__(self, config: MCPServerConfig):
                    self.config = config
                    self._initialized = asyncio.Event()
                    self._shutdown = asyncio.Event()
                    self.name = config.name or "unnamed"
                    self.session: ClientSession | None = None
            
                async def initialize(self):
                    """Initialize server if not already done."""
                    if self._initialized.is_set():
                        return
            
                    try:
                        logger.info("Initializing MCP server: %s", self.name)
                        match self.config:
                            case StdioMCPServerConfig():
                                if not self.config.command or not self.config.args:
                                    msg = f"Command and args required for stdio: {self.name}"
                                    raise ValueError(msg)  # noqa: TRY301
            
                                command = shutil.which(self.config.command) or self.config.command
                                server_params = StdioServerParameters(
                                    command=command,
                                    args=self.config.args,
                                    env=self.config.get_env_vars(),
                                )
                                async with stdio_client(server_params) as (read_stream, write_stream):
                                    self.session = ClientSession(
                                        read_stream,
                                        write_stream,
                                        read_timeout_seconds=timedelta(seconds=self.config.timeout)
                                        if self.config.timeout
                                        else None,
                                    )
                                    await self.session.initialize()
            
                            case SSEMCPServerConfig():
                                if not self.config.url:
                                    msg = f"URL required for SSE transport: {self.name}"
                                    raise ValueError(msg)  # noqa: TRY301
            
                                async with sse_client(self.config.url) as (read_stream, write_stream):
                                    self.session = ClientSession(
                                        read_stream,
                                        write_stream,
                                        read_timeout_seconds=timedelta(seconds=self.config.timeout)
                                        if self.config.timeout
                                        else None,
                                    )
                                    await self.session.initialize()
            
                            case StreamableHTTPMCPServerConfig():
                                if not self.config.url:
                                    msg = f"URL required for SSE transport: {self.name}"
                                    raise ValueError(msg)  # noqa: TRY301
            
                                async with streamablehttp_client(self.config.url) as (
                                    read_stream,
                                    write_stream,
                                ):
                                    self.session = ClientSession(
                                        read_stream,
                                        write_stream,
                                        read_timeout_seconds=timedelta(seconds=self.config.timeout)
                                        if self.config.timeout
                                        else None,
                                    )
                                    await self.session.initialize()
            
                            case _:
                                msg = f"Unsupported transport: {self.config.transport}"
                                raise ValueError(msg)  # noqa: TRY301
            
                        self._initialized.set()
                        logger.info("MCP server initialized: %s", self.name)
            
                    except Exception as e:
                        logger.exception("Failed to initialize MCP server: %s", self.name)
                        msg = f"Server initialization failed: {e}"
                        raise RuntimeError(msg) from e
            
                async def shutdown(self):
                    """Clean shutdown of the server."""
                    if not self._initialized.is_set():
                        return
            
                    try:
                        logger.info("Shutting down MCP server: %s", self.name)
                        self._shutdown.set()
                        if self.session:
                            self.session = None
                        self._initialized.clear()
                        logger.info("MCP server shut down: %s", self.name)
                    except Exception as e:
                        logger.exception("Error during server shutdown: %s", self.name)
                        msg = f"Server shutdown failed: {e}"
                        raise RuntimeError(msg) from e
            

            initialize async

            initialize()
            

            Initialize server if not already done.

            Source code in src/llmling_agent/mcp_server/registry.py
             38
             39
             40
             41
             42
             43
             44
             45
             46
             47
             48
             49
             50
             51
             52
             53
             54
             55
             56
             57
             58
             59
             60
             61
             62
             63
             64
             65
             66
             67
             68
             69
             70
             71
             72
             73
             74
             75
             76
             77
             78
             79
             80
             81
             82
             83
             84
             85
             86
             87
             88
             89
             90
             91
             92
             93
             94
             95
             96
             97
             98
             99
            100
            101
            102
            103
            104
            105
            106
            107
            108
            109
            110
            async def initialize(self):
                """Initialize server if not already done."""
                if self._initialized.is_set():
                    return
            
                try:
                    logger.info("Initializing MCP server: %s", self.name)
                    match self.config:
                        case StdioMCPServerConfig():
                            if not self.config.command or not self.config.args:
                                msg = f"Command and args required for stdio: {self.name}"
                                raise ValueError(msg)  # noqa: TRY301
            
                            command = shutil.which(self.config.command) or self.config.command
                            server_params = StdioServerParameters(
                                command=command,
                                args=self.config.args,
                                env=self.config.get_env_vars(),
                            )
                            async with stdio_client(server_params) as (read_stream, write_stream):
                                self.session = ClientSession(
                                    read_stream,
                                    write_stream,
                                    read_timeout_seconds=timedelta(seconds=self.config.timeout)
                                    if self.config.timeout
                                    else None,
                                )
                                await self.session.initialize()
            
                        case SSEMCPServerConfig():
                            if not self.config.url:
                                msg = f"URL required for SSE transport: {self.name}"
                                raise ValueError(msg)  # noqa: TRY301
            
                            async with sse_client(self.config.url) as (read_stream, write_stream):
                                self.session = ClientSession(
                                    read_stream,
                                    write_stream,
                                    read_timeout_seconds=timedelta(seconds=self.config.timeout)
                                    if self.config.timeout
                                    else None,
                                )
                                await self.session.initialize()
            
                        case StreamableHTTPMCPServerConfig():
                            if not self.config.url:
                                msg = f"URL required for SSE transport: {self.name}"
                                raise ValueError(msg)  # noqa: TRY301
            
                            async with streamablehttp_client(self.config.url) as (
                                read_stream,
                                write_stream,
                            ):
                                self.session = ClientSession(
                                    read_stream,
                                    write_stream,
                                    read_timeout_seconds=timedelta(seconds=self.config.timeout)
                                    if self.config.timeout
                                    else None,
                                )
                                await self.session.initialize()
            
                        case _:
                            msg = f"Unsupported transport: {self.config.transport}"
                            raise ValueError(msg)  # noqa: TRY301
            
                    self._initialized.set()
                    logger.info("MCP server initialized: %s", self.name)
            
                except Exception as e:
                    logger.exception("Failed to initialize MCP server: %s", self.name)
                    msg = f"Server initialization failed: {e}"
                    raise RuntimeError(msg) from e
            

            shutdown async

            shutdown()
            

            Clean shutdown of the server.

            Source code in src/llmling_agent/mcp_server/registry.py
            112
            113
            114
            115
            116
            117
            118
            119
            120
            121
            122
            123
            124
            125
            126
            127
            async def shutdown(self):
                """Clean shutdown of the server."""
                if not self._initialized.is_set():
                    return
            
                try:
                    logger.info("Shutting down MCP server: %s", self.name)
                    self._shutdown.set()
                    if self.session:
                        self.session = None
                    self._initialized.clear()
                    logger.info("MCP server shut down: %s", self.name)
                except Exception as e:
                    logger.exception("Error during server shutdown: %s", self.name)
                    msg = f"Server shutdown failed: {e}"
                    raise RuntimeError(msg) from e
            

            ServerRegistry

            Global registry for MCP server configurations and instances.

            This is a singleton class that maintains references to all MCP server instances. It handles: - Server lifecycle management - Instance tracking with different configurations - Clean shutdown

            Source code in src/llmling_agent/mcp_server/registry.py
            130
            131
            132
            133
            134
            135
            136
            137
            138
            139
            140
            141
            142
            143
            144
            145
            146
            147
            148
            149
            150
            151
            152
            153
            154
            155
            156
            157
            158
            159
            160
            161
            162
            163
            164
            165
            166
            167
            168
            169
            170
            171
            172
            173
            174
            175
            176
            177
            178
            179
            180
            181
            182
            183
            184
            185
            186
            187
            188
            189
            190
            191
            192
            193
            194
            195
            196
            197
            198
            199
            200
            201
            202
            203
            204
            205
            206
            207
            208
            209
            210
            211
            212
            213
            214
            class ServerRegistry:
                """Global registry for MCP server configurations and instances.
            
                This is a singleton class that maintains references to all MCP server
                instances. It handles:
                - Server lifecycle management
                - Instance tracking with different configurations
                - Clean shutdown
                """
            
                _instance: Self | None = None
            
                def __init__(self):
                    """Initialize registry - use get_registry() instead."""
                    self._servers: dict[MCPServerConfig, MCPServer] = {}
                    self._lock = asyncio.Lock()
                    # Use weak references to allow garbage collection
                    self._refs = weakref.WeakValueDictionary[MCPServerConfig, MCPServer]()
            
                @classmethod
                def get_registry(cls) -> Self:
                    """Get the global registry instance."""
                    if cls._instance is None:
                        cls._instance = cls()
                    return cls._instance
            
                async def get_or_create_server(
                    self,
                    config: MCPServerConfig,
                    **kwargs: Any,
                ) -> MCPServer:
                    """Get existing server or create new one.
            
                    Args:
                        config: Server configuration
                        **kwargs: Additional configuration overrides
            
                    Returns:
                        Running server instance
            
                    Raises:
                        RuntimeError: If server creation fails
                    """
                    async with self._lock:
                        # Apply overrides
                        if kwargs:
                            config = config.model_copy(update=kwargs)
            
                        # Return existing server if available
                        if config in self._servers:
                            return self._servers[config]
            
                        # Create new server
                        try:
                            server = MCPServer(config)
                            await server.initialize()
                            self._servers[config] = server
                            self._refs[config] = server
                        except Exception as e:
                            logger.exception("Failed to create MCP server: %s", config.name)
                            msg = f"Server creation failed: {e}"
                            raise RuntimeError(msg) from e
                        else:
                            return server
            
                async def shutdown_server(self, config: MCPServerConfig):
                    """Shut down a specific server."""
                    async with self._lock:
                        if server := self._servers.pop(config, None):
                            await server.shutdown()
            
                async def shutdown_all(self):
                    """Clean shutdown of all servers."""
                    async with self._lock:
                        for server in list(self._servers.values()):
                            await server.shutdown()
                        self._servers.clear()
            
                def get_server(self, config: MCPServerConfig) -> MCPServer | None:
                    """Get server by config if it exists."""
                    return self._servers.get(config)
            
                def __contains__(self, config: MCPServerConfig) -> bool:
                    """Check if server exists for this config."""
                    return config in self._servers
            

            __contains__

            __contains__(config: MCPServerConfig) -> bool
            

            Check if server exists for this config.

            Source code in src/llmling_agent/mcp_server/registry.py
            212
            213
            214
            def __contains__(self, config: MCPServerConfig) -> bool:
                """Check if server exists for this config."""
                return config in self._servers
            

            __init__

            __init__()
            

            Initialize registry - use get_registry() instead.

            Source code in src/llmling_agent/mcp_server/registry.py
            142
            143
            144
            145
            146
            147
            def __init__(self):
                """Initialize registry - use get_registry() instead."""
                self._servers: dict[MCPServerConfig, MCPServer] = {}
                self._lock = asyncio.Lock()
                # Use weak references to allow garbage collection
                self._refs = weakref.WeakValueDictionary[MCPServerConfig, MCPServer]()
            

            get_or_create_server async

            get_or_create_server(config: MCPServerConfig, **kwargs: Any) -> MCPServer
            

            Get existing server or create new one.

            Parameters:

            Name Type Description Default
            config MCPServerConfig

            Server configuration

            required
            **kwargs Any

            Additional configuration overrides

            {}

            Returns:

            Type Description
            MCPServer

            Running server instance

            Raises:

            Type Description
            RuntimeError

            If server creation fails

            Source code in src/llmling_agent/mcp_server/registry.py
            156
            157
            158
            159
            160
            161
            162
            163
            164
            165
            166
            167
            168
            169
            170
            171
            172
            173
            174
            175
            176
            177
            178
            179
            180
            181
            182
            183
            184
            185
            186
            187
            188
            189
            190
            191
            192
            193
            async def get_or_create_server(
                self,
                config: MCPServerConfig,
                **kwargs: Any,
            ) -> MCPServer:
                """Get existing server or create new one.
            
                Args:
                    config: Server configuration
                    **kwargs: Additional configuration overrides
            
                Returns:
                    Running server instance
            
                Raises:
                    RuntimeError: If server creation fails
                """
                async with self._lock:
                    # Apply overrides
                    if kwargs:
                        config = config.model_copy(update=kwargs)
            
                    # Return existing server if available
                    if config in self._servers:
                        return self._servers[config]
            
                    # Create new server
                    try:
                        server = MCPServer(config)
                        await server.initialize()
                        self._servers[config] = server
                        self._refs[config] = server
                    except Exception as e:
                        logger.exception("Failed to create MCP server: %s", config.name)
                        msg = f"Server creation failed: {e}"
                        raise RuntimeError(msg) from e
                    else:
                        return server
            

            get_registry classmethod

            get_registry() -> Self
            

            Get the global registry instance.

            Source code in src/llmling_agent/mcp_server/registry.py
            149
            150
            151
            152
            153
            154
            @classmethod
            def get_registry(cls) -> Self:
                """Get the global registry instance."""
                if cls._instance is None:
                    cls._instance = cls()
                return cls._instance
            

            get_server

            get_server(config: MCPServerConfig) -> MCPServer | None
            

            Get server by config if it exists.

            Source code in src/llmling_agent/mcp_server/registry.py
            208
            209
            210
            def get_server(self, config: MCPServerConfig) -> MCPServer | None:
                """Get server by config if it exists."""
                return self._servers.get(config)
            

            shutdown_all async

            shutdown_all()
            

            Clean shutdown of all servers.

            Source code in src/llmling_agent/mcp_server/registry.py
            201
            202
            203
            204
            205
            206
            async def shutdown_all(self):
                """Clean shutdown of all servers."""
                async with self._lock:
                    for server in list(self._servers.values()):
                        await server.shutdown()
                    self._servers.clear()
            

            shutdown_server async

            shutdown_server(config: MCPServerConfig)
            

            Shut down a specific server.

            Source code in src/llmling_agent/mcp_server/registry.py
            195
            196
            197
            198
            199
            async def shutdown_server(self, config: MCPServerConfig):
                """Shut down a specific server."""
                async with self._lock:
                    if server := self._servers.pop(config, None):
                        await server.shutdown()