Skip to content

registry

Class info

Classes

Name Children Inherits
MCPServer
llmling_agent.mcp_server.registry
Represents a running MCP server instance.
    SSEMCPServerConfig
    llmling_agent_config.mcp_server
    MCP server using Server-Sent Events transport.
      • BaseMCPServerConfig
      ServerRegistry
      llmling_agent.mcp_server.registry
      Global registry for MCP server configurations and instances.
        StdioMCPServerConfig
        llmling_agent_config.mcp_server
        MCP server started via stdio.
          • BaseMCPServerConfig

          🛈 DocStrings

          Global registry for MCP server instances.

          MCPServer

          Represents a running MCP server instance.

          Source code in src/llmling_agent/mcp_server/registry.py
           26
           27
           28
           29
           30
           31
           32
           33
           34
           35
           36
           37
           38
           39
           40
           41
           42
           43
           44
           45
           46
           47
           48
           49
           50
           51
           52
           53
           54
           55
           56
           57
           58
           59
           60
           61
           62
           63
           64
           65
           66
           67
           68
           69
           70
           71
           72
           73
           74
           75
           76
           77
           78
           79
           80
           81
           82
           83
           84
           85
           86
           87
           88
           89
           90
           91
           92
           93
           94
           95
           96
           97
           98
           99
          100
          101
          102
          103
          104
          105
          106
          107
          class MCPServer:
              """Represents a running MCP server instance."""
          
              def __init__(self, config: MCPServerConfig):
                  self.config = config
                  self._initialized = asyncio.Event()
                  self._shutdown = asyncio.Event()
                  self.name = config.name or "unnamed"
                  self.session: ClientSession | None = None
          
              async def initialize(self):
                  """Initialize server if not already done."""
                  if self._initialized.is_set():
                      return
          
                  try:
                      logger.info("Initializing MCP server: %s", self.name)
                      match self.config:
                          case StdioMCPServerConfig():
                              if not self.config.command or not self.config.args:
                                  msg = f"Command and args required for stdio: {self.name}"
                                  raise ValueError(msg)  # noqa: TRY301
          
                              command = shutil.which(self.config.command) or self.config.command
                              server_params = StdioServerParameters(
                                  command=command,
                                  args=self.config.args,
                                  env=self.config.get_env_vars(),
                              )
                              async with stdio_client(server_params) as (read_stream, write_stream):
                                  self.session = ClientSession(
                                      read_stream,
                                      write_stream,
                                      read_timeout_seconds=timedelta(seconds=self.config.timeout)
                                      if self.config.timeout
                                      else None,
                                  )
                                  await self.session.initialize()
          
                          case SSEMCPServerConfig():
                              if not self.config.url:
                                  msg = f"URL required for SSE transport: {self.name}"
                                  raise ValueError(msg)  # noqa: TRY301
          
                              async with sse_client(self.config.url) as (read_stream, write_stream):
                                  self.session = ClientSession(
                                      read_stream,
                                      write_stream,
                                      read_timeout_seconds=timedelta(seconds=self.config.timeout)
                                      if self.config.timeout
                                      else None,
                                  )
                                  await self.session.initialize()
          
                          case _:
                              msg = f"Unsupported transport: {self.config.transport}"
                              raise ValueError(msg)  # noqa: TRY301
          
                      self._initialized.set()
                      logger.info("MCP server initialized: %s", self.name)
          
                  except Exception as e:
                      logger.exception("Failed to initialize MCP server: %s", self.name)
                      msg = f"Server initialization failed: {e}"
                      raise RuntimeError(msg) from e
          
              async def shutdown(self):
                  """Clean shutdown of the server."""
                  if not self._initialized.is_set():
                      return
          
                  try:
                      logger.info("Shutting down MCP server: %s", self.name)
                      self._shutdown.set()
                      if self.session:
                          self.session = None
                      self._initialized.clear()
                      logger.info("MCP server shut down: %s", self.name)
                  except Exception as e:
                      logger.exception("Error during server shutdown: %s", self.name)
                      msg = f"Server shutdown failed: {e}"
                      raise RuntimeError(msg) from e
          

          initialize async

          initialize()
          

          Initialize server if not already done.

          Source code in src/llmling_agent/mcp_server/registry.py
          36
          37
          38
          39
          40
          41
          42
          43
          44
          45
          46
          47
          48
          49
          50
          51
          52
          53
          54
          55
          56
          57
          58
          59
          60
          61
          62
          63
          64
          65
          66
          67
          68
          69
          70
          71
          72
          73
          74
          75
          76
          77
          78
          79
          80
          81
          82
          83
          84
          85
          86
          87
          88
          89
          90
          async def initialize(self):
              """Initialize server if not already done."""
              if self._initialized.is_set():
                  return
          
              try:
                  logger.info("Initializing MCP server: %s", self.name)
                  match self.config:
                      case StdioMCPServerConfig():
                          if not self.config.command or not self.config.args:
                              msg = f"Command and args required for stdio: {self.name}"
                              raise ValueError(msg)  # noqa: TRY301
          
                          command = shutil.which(self.config.command) or self.config.command
                          server_params = StdioServerParameters(
                              command=command,
                              args=self.config.args,
                              env=self.config.get_env_vars(),
                          )
                          async with stdio_client(server_params) as (read_stream, write_stream):
                              self.session = ClientSession(
                                  read_stream,
                                  write_stream,
                                  read_timeout_seconds=timedelta(seconds=self.config.timeout)
                                  if self.config.timeout
                                  else None,
                              )
                              await self.session.initialize()
          
                      case SSEMCPServerConfig():
                          if not self.config.url:
                              msg = f"URL required for SSE transport: {self.name}"
                              raise ValueError(msg)  # noqa: TRY301
          
                          async with sse_client(self.config.url) as (read_stream, write_stream):
                              self.session = ClientSession(
                                  read_stream,
                                  write_stream,
                                  read_timeout_seconds=timedelta(seconds=self.config.timeout)
                                  if self.config.timeout
                                  else None,
                              )
                              await self.session.initialize()
          
                      case _:
                          msg = f"Unsupported transport: {self.config.transport}"
                          raise ValueError(msg)  # noqa: TRY301
          
                  self._initialized.set()
                  logger.info("MCP server initialized: %s", self.name)
          
              except Exception as e:
                  logger.exception("Failed to initialize MCP server: %s", self.name)
                  msg = f"Server initialization failed: {e}"
                  raise RuntimeError(msg) from e
          

          shutdown async

          shutdown()
          

          Clean shutdown of the server.

          Source code in src/llmling_agent/mcp_server/registry.py
           92
           93
           94
           95
           96
           97
           98
           99
          100
          101
          102
          103
          104
          105
          106
          107
          async def shutdown(self):
              """Clean shutdown of the server."""
              if not self._initialized.is_set():
                  return
          
              try:
                  logger.info("Shutting down MCP server: %s", self.name)
                  self._shutdown.set()
                  if self.session:
                      self.session = None
                  self._initialized.clear()
                  logger.info("MCP server shut down: %s", self.name)
              except Exception as e:
                  logger.exception("Error during server shutdown: %s", self.name)
                  msg = f"Server shutdown failed: {e}"
                  raise RuntimeError(msg) from e
          

          ServerRegistry

          Global registry for MCP server configurations and instances.

          This is a singleton class that maintains references to all MCP server instances. It handles: - Server lifecycle management - Instance tracking with different configurations - Clean shutdown

          Source code in src/llmling_agent/mcp_server/registry.py
          110
          111
          112
          113
          114
          115
          116
          117
          118
          119
          120
          121
          122
          123
          124
          125
          126
          127
          128
          129
          130
          131
          132
          133
          134
          135
          136
          137
          138
          139
          140
          141
          142
          143
          144
          145
          146
          147
          148
          149
          150
          151
          152
          153
          154
          155
          156
          157
          158
          159
          160
          161
          162
          163
          164
          165
          166
          167
          168
          169
          170
          171
          172
          173
          174
          175
          176
          177
          178
          179
          180
          181
          182
          183
          184
          185
          186
          187
          188
          189
          190
          191
          192
          193
          194
          class ServerRegistry:
              """Global registry for MCP server configurations and instances.
          
              This is a singleton class that maintains references to all MCP server
              instances. It handles:
              - Server lifecycle management
              - Instance tracking with different configurations
              - Clean shutdown
              """
          
              _instance: Self | None = None
          
              def __init__(self):
                  """Initialize registry - use get_registry() instead."""
                  self._servers: dict[MCPServerConfig, MCPServer] = {}
                  self._lock = asyncio.Lock()
                  # Use weak references to allow garbage collection
                  self._refs = weakref.WeakValueDictionary[MCPServerConfig, MCPServer]()
          
              @classmethod
              def get_registry(cls) -> Self:
                  """Get the global registry instance."""
                  if cls._instance is None:
                      cls._instance = cls()
                  return cls._instance
          
              async def get_or_create_server(
                  self,
                  config: MCPServerConfig,
                  **kwargs: Any,
              ) -> MCPServer:
                  """Get existing server or create new one.
          
                  Args:
                      config: Server configuration
                      **kwargs: Additional configuration overrides
          
                  Returns:
                      Running server instance
          
                  Raises:
                      RuntimeError: If server creation fails
                  """
                  async with self._lock:
                      # Apply overrides
                      if kwargs:
                          config = config.model_copy(update=kwargs)
          
                      # Return existing server if available
                      if config in self._servers:
                          return self._servers[config]
          
                      # Create new server
                      try:
                          server = MCPServer(config)
                          await server.initialize()
                          self._servers[config] = server
                          self._refs[config] = server
                      except Exception as e:
                          logger.exception("Failed to create MCP server: %s", config.name)
                          msg = f"Server creation failed: {e}"
                          raise RuntimeError(msg) from e
                      else:
                          return server
          
              async def shutdown_server(self, config: MCPServerConfig):
                  """Shut down a specific server."""
                  async with self._lock:
                      if server := self._servers.pop(config, None):
                          await server.shutdown()
          
              async def shutdown_all(self):
                  """Clean shutdown of all servers."""
                  async with self._lock:
                      for server in list(self._servers.values()):
                          await server.shutdown()
                      self._servers.clear()
          
              def get_server(self, config: MCPServerConfig) -> MCPServer | None:
                  """Get server by config if it exists."""
                  return self._servers.get(config)
          
              def __contains__(self, config: MCPServerConfig) -> bool:
                  """Check if server exists for this config."""
                  return config in self._servers
          

          __contains__

          __contains__(config: MCPServerConfig) -> bool
          

          Check if server exists for this config.

          Source code in src/llmling_agent/mcp_server/registry.py
          192
          193
          194
          def __contains__(self, config: MCPServerConfig) -> bool:
              """Check if server exists for this config."""
              return config in self._servers
          

          __init__

          __init__()
          

          Initialize registry - use get_registry() instead.

          Source code in src/llmling_agent/mcp_server/registry.py
          122
          123
          124
          125
          126
          127
          def __init__(self):
              """Initialize registry - use get_registry() instead."""
              self._servers: dict[MCPServerConfig, MCPServer] = {}
              self._lock = asyncio.Lock()
              # Use weak references to allow garbage collection
              self._refs = weakref.WeakValueDictionary[MCPServerConfig, MCPServer]()
          

          get_or_create_server async

          get_or_create_server(config: MCPServerConfig, **kwargs: Any) -> MCPServer
          

          Get existing server or create new one.

          Parameters:

          Name Type Description Default
          config MCPServerConfig

          Server configuration

          required
          **kwargs Any

          Additional configuration overrides

          {}

          Returns:

          Type Description
          MCPServer

          Running server instance

          Raises:

          Type Description
          RuntimeError

          If server creation fails

          Source code in src/llmling_agent/mcp_server/registry.py
          136
          137
          138
          139
          140
          141
          142
          143
          144
          145
          146
          147
          148
          149
          150
          151
          152
          153
          154
          155
          156
          157
          158
          159
          160
          161
          162
          163
          164
          165
          166
          167
          168
          169
          170
          171
          172
          173
          async def get_or_create_server(
              self,
              config: MCPServerConfig,
              **kwargs: Any,
          ) -> MCPServer:
              """Get existing server or create new one.
          
              Args:
                  config: Server configuration
                  **kwargs: Additional configuration overrides
          
              Returns:
                  Running server instance
          
              Raises:
                  RuntimeError: If server creation fails
              """
              async with self._lock:
                  # Apply overrides
                  if kwargs:
                      config = config.model_copy(update=kwargs)
          
                  # Return existing server if available
                  if config in self._servers:
                      return self._servers[config]
          
                  # Create new server
                  try:
                      server = MCPServer(config)
                      await server.initialize()
                      self._servers[config] = server
                      self._refs[config] = server
                  except Exception as e:
                      logger.exception("Failed to create MCP server: %s", config.name)
                      msg = f"Server creation failed: {e}"
                      raise RuntimeError(msg) from e
                  else:
                      return server
          

          get_registry classmethod

          get_registry() -> Self
          

          Get the global registry instance.

          Source code in src/llmling_agent/mcp_server/registry.py
          129
          130
          131
          132
          133
          134
          @classmethod
          def get_registry(cls) -> Self:
              """Get the global registry instance."""
              if cls._instance is None:
                  cls._instance = cls()
              return cls._instance
          

          get_server

          get_server(config: MCPServerConfig) -> MCPServer | None
          

          Get server by config if it exists.

          Source code in src/llmling_agent/mcp_server/registry.py
          188
          189
          190
          def get_server(self, config: MCPServerConfig) -> MCPServer | None:
              """Get server by config if it exists."""
              return self._servers.get(config)
          

          shutdown_all async

          shutdown_all()
          

          Clean shutdown of all servers.

          Source code in src/llmling_agent/mcp_server/registry.py
          181
          182
          183
          184
          185
          186
          async def shutdown_all(self):
              """Clean shutdown of all servers."""
              async with self._lock:
                  for server in list(self._servers.values()):
                      await server.shutdown()
                  self._servers.clear()
          

          shutdown_server async

          shutdown_server(config: MCPServerConfig)
          

          Shut down a specific server.

          Source code in src/llmling_agent/mcp_server/registry.py
          175
          176
          177
          178
          179
          async def shutdown_server(self, config: MCPServerConfig):
              """Shut down a specific server."""
              async with self._lock:
                  if server := self._servers.pop(config, None):
                      await server.shutdown()