Skip to content

manager

Class info

Classes

Name Children Inherits
AggregatingResourceProvider
llmling_agent.resource_providers.aggregating
Provider that combines resources from multiple providers.
AudioBase64Content
llmling_agent.models.content
Audio from base64 data.
    BaseMCPServerConfig
    llmling_agent_config.mcp_server
    Base model for MCP server configuration.
    • StdioMCPServerConfig
    • SSEMCPServerConfig
    • StreamableHTTPMCPServerConfig
    ImageBase64Content
    llmling_agent.models.content
    Image from base64 data.
      MCPManager
      llmling_agent.mcp_server.manager
      Manages MCP server connections and distributes resource providers.
        MCPResourceProvider
        llmling_agent.resource_providers.mcp_provider
        Resource provider for a single MCP server.
          ResourceProvider
          llmling_agent.resource_providers.base
          Base class for resource providers.

          🛈 DocStrings

          MCP server management for LLMling agents.

          MCPManager

          Manages MCP server connections and distributes resource providers.

          Source code in src/llmling_agent/mcp_server/manager.py
           33
           34
           35
           36
           37
           38
           39
           40
           41
           42
           43
           44
           45
           46
           47
           48
           49
           50
           51
           52
           53
           54
           55
           56
           57
           58
           59
           60
           61
           62
           63
           64
           65
           66
           67
           68
           69
           70
           71
           72
           73
           74
           75
           76
           77
           78
           79
           80
           81
           82
           83
           84
           85
           86
           87
           88
           89
           90
           91
           92
           93
           94
           95
           96
           97
           98
           99
          100
          101
          102
          103
          104
          105
          106
          107
          108
          109
          110
          111
          112
          113
          114
          115
          116
          117
          118
          119
          120
          121
          122
          123
          124
          125
          126
          127
          128
          129
          130
          131
          132
          133
          134
          135
          136
          137
          138
          139
          140
          141
          142
          143
          144
          145
          146
          147
          148
          149
          150
          151
          152
          153
          154
          155
          156
          157
          158
          159
          160
          161
          162
          163
          164
          165
          166
          167
          168
          169
          170
          171
          172
          173
          174
          175
          176
          177
          178
          179
          180
          181
          182
          183
          184
          185
          186
          187
          188
          189
          190
          191
          192
          193
          194
          195
          196
          197
          198
          199
          200
          201
          202
          203
          class MCPManager:
              """Manages MCP server connections and distributes resource providers."""
          
              def __init__(
                  self,
                  name: str = "mcp",
                  owner: str | None = None,
                  servers: Sequence[MCPServerConfig | str] | None = None,
                  accessible_roots: list[str] | None = None,
              ) -> None:
                  self.name = name
                  self.owner = owner
                  self.servers: list[MCPServerConfig] = []
                  for server in servers or []:
                      self.add_server_config(server)
                  self.providers: list[MCPResourceProvider] = []
                  self.aggregating_provider = AggregatingResourceProvider(
                      providers=cast(list[ResourceProvider], self.providers),
                      name=f"{name}_aggregated",
                  )
                  self.exit_stack = AsyncExitStack()
                  self._accessible_roots = accessible_roots
          
              def add_server_config(self, cfg: MCPServerConfig | str) -> None:
                  """Add a new MCP server to the manager."""
                  resolved = BaseMCPServerConfig.from_string(cfg) if isinstance(cfg, str) else cfg
                  self.servers.append(resolved)
          
              def __repr__(self) -> str:
                  return f"MCPManager(name={self.name!r}, servers={len(self.servers)})"
          
              async def __aenter__(self) -> Self:
                  try:
                      if tasks := [self._setup_server(server) for server in self.servers]:
                          await asyncio.gather(*tasks)
                  except Exception as e:
                      await self.__aexit__(type(e), e, e.__traceback__)
                      msg = "Failed to initialize MCP manager"
                      raise RuntimeError(msg) from e
          
                  return self
          
              async def __aexit__(
                  self,
                  exc_type: type[BaseException] | None,
                  exc_val: BaseException | None,
                  exc_tb: TracebackType | None,
              ) -> None:
                  await self.cleanup()
          
              async def _sampling_callback(
                  self,
                  messages: list[SamplingMessage],
                  params: types.CreateMessageRequestParams,
                  context: RequestContext[Any, Any, Any],
              ) -> str:
                  """Handle MCP sampling by creating a new agent with specified preferences."""
                  from mcp import types
          
                  from llmling_agent.agent import Agent
          
                  # Convert messages to prompts for the agent
                  prompts: list[BaseContent | str] = []
                  for mcp_msg in messages:
                      match mcp_msg.content:
                          case types.TextContent(text=text):
                              prompts.append(text)
                          case types.ImageContent(data=data, mimeType=mime_type):
                              our_image = ImageBase64Content(data=data, mime_type=mime_type)
                              prompts.append(our_image)
                          case types.AudioContent(data=data, mimeType=mime_type):
                              fmt = mime_type.removeprefix("audio/")
                              our_audio = AudioBase64Content(data=data, format=fmt)
                              prompts.append(our_audio)
          
                  # Extract model from preferences
                  model = None
                  if (prefs := params.modelPreferences) and prefs.hints and prefs.hints[0].name:
                      model = prefs.hints[0].name
                  # Create usage limits from sampling parameters
                  limits = UsageLimits(output_tokens_limit=params.maxTokens, request_limit=1)
                  # TODO: Apply temperature from params.temperature
                  sys_prompt = params.systemPrompt or ""
                  agent = Agent(name="sampling-agent", model=model, system_prompt=sys_prompt, session=False)
                  try:
                      async with agent:
                          result = await agent.run(*prompts, store_history=False, usage_limits=limits)
                          return result.content
          
                  except Exception as e:
                      logger.exception("Sampling failed")
                      return f"Sampling failed: {e!s}"
          
              async def _setup_server(self, config: MCPServerConfig) -> None:
                  """Set up a single MCP server resource provider."""
                  if not config.enabled:
                      return
          
                  provider = MCPResourceProvider(
                      server=config,
                      name=f"{self.name}_{config.client_id}",
                      owner=self.owner,
                      source="pool" if self.owner == "pool" else "node",
                      sampling_callback=self._sampling_callback,
                      accessible_roots=self._accessible_roots,
                  )
                  provider = await self.exit_stack.enter_async_context(provider)
                  self.providers.append(provider)
          
              def get_mcp_providers(self) -> list[MCPResourceProvider]:
                  """Get all MCP resource providers managed by this manager."""
                  return list(self.providers)
          
              def get_aggregating_provider(self) -> AggregatingResourceProvider:
                  """Get the aggregating provider that contains all MCP providers."""
                  return self.aggregating_provider
          
              async def setup_server_runtime(self, config: MCPServerConfig) -> MCPResourceProvider:
                  """Set up a single MCP server at runtime while manager is running.
          
                  Returns:
                      The newly created and initialized MCPResourceProvider
                  """
                  if not config.enabled:
                      msg = f"Server config {config.client_id} is disabled"
                      raise ValueError(msg)
          
                  # Add the config first
                  self.add_server_config(config)
                  provider = MCPResourceProvider(
                      server=config,
                      name=f"{self.name}_{config.client_id}",
                      owner=self.owner,
                      source="pool" if self.owner == "pool" else "node",
                      sampling_callback=self._sampling_callback,
                      accessible_roots=self._accessible_roots,
                  )
                  provider = await self.exit_stack.enter_async_context(provider)
                  self.providers.append(provider)
                  # Note: AggregatingResourceProvider automatically sees the new provider
                  # since it references self.providers list
          
                  return provider
          
              async def cleanup(self) -> None:
                  """Clean up all MCP connections and providers."""
                  try:
                      try:
                          # Clean up exit stack (which includes MCP providers)
                          await self.exit_stack.aclose()
                      except RuntimeError as e:
                          if "different task" in str(e):
                              # Handle task context mismatch
                              current_task = asyncio.current_task()
                              if current_task:
                                  loop = asyncio.get_running_loop()
                                  await loop.create_task(self.exit_stack.aclose())
                          else:
                              raise
          
                      self.providers.clear()
          
                  except Exception as e:
                      msg = "Error during MCP manager cleanup"
                      logger.exception(msg, exc_info=e)
                      raise RuntimeError(msg) from e
          
              @property
              def active_servers(self) -> list[str]:
                  """Get IDs of active servers."""
                  return [provider.server.client_id for provider in self.providers]
          

          active_servers property

          active_servers: list[str]
          

          Get IDs of active servers.

          add_server_config

          add_server_config(cfg: MCPServerConfig | str) -> None
          

          Add a new MCP server to the manager.

          Source code in src/llmling_agent/mcp_server/manager.py
          56
          57
          58
          59
          def add_server_config(self, cfg: MCPServerConfig | str) -> None:
              """Add a new MCP server to the manager."""
              resolved = BaseMCPServerConfig.from_string(cfg) if isinstance(cfg, str) else cfg
              self.servers.append(resolved)
          

          cleanup async

          cleanup() -> None
          

          Clean up all MCP connections and providers.

          Source code in src/llmling_agent/mcp_server/manager.py
          177
          178
          179
          180
          181
          182
          183
          184
          185
          186
          187
          188
          189
          190
          191
          192
          193
          194
          195
          196
          197
          198
          async def cleanup(self) -> None:
              """Clean up all MCP connections and providers."""
              try:
                  try:
                      # Clean up exit stack (which includes MCP providers)
                      await self.exit_stack.aclose()
                  except RuntimeError as e:
                      if "different task" in str(e):
                          # Handle task context mismatch
                          current_task = asyncio.current_task()
                          if current_task:
                              loop = asyncio.get_running_loop()
                              await loop.create_task(self.exit_stack.aclose())
                      else:
                          raise
          
                  self.providers.clear()
          
              except Exception as e:
                  msg = "Error during MCP manager cleanup"
                  logger.exception(msg, exc_info=e)
                  raise RuntimeError(msg) from e
          

          get_aggregating_provider

          get_aggregating_provider() -> AggregatingResourceProvider
          

          Get the aggregating provider that contains all MCP providers.

          Source code in src/llmling_agent/mcp_server/manager.py
          146
          147
          148
          def get_aggregating_provider(self) -> AggregatingResourceProvider:
              """Get the aggregating provider that contains all MCP providers."""
              return self.aggregating_provider
          

          get_mcp_providers

          get_mcp_providers() -> list[MCPResourceProvider]
          

          Get all MCP resource providers managed by this manager.

          Source code in src/llmling_agent/mcp_server/manager.py
          142
          143
          144
          def get_mcp_providers(self) -> list[MCPResourceProvider]:
              """Get all MCP resource providers managed by this manager."""
              return list(self.providers)
          

          setup_server_runtime async

          setup_server_runtime(config: MCPServerConfig) -> MCPResourceProvider
          

          Set up a single MCP server at runtime while manager is running.

          Returns:

          Type Description
          MCPResourceProvider

          The newly created and initialized MCPResourceProvider

          Source code in src/llmling_agent/mcp_server/manager.py
          150
          151
          152
          153
          154
          155
          156
          157
          158
          159
          160
          161
          162
          163
          164
          165
          166
          167
          168
          169
          170
          171
          172
          173
          174
          175
          async def setup_server_runtime(self, config: MCPServerConfig) -> MCPResourceProvider:
              """Set up a single MCP server at runtime while manager is running.
          
              Returns:
                  The newly created and initialized MCPResourceProvider
              """
              if not config.enabled:
                  msg = f"Server config {config.client_id} is disabled"
                  raise ValueError(msg)
          
              # Add the config first
              self.add_server_config(config)
              provider = MCPResourceProvider(
                  server=config,
                  name=f"{self.name}_{config.client_id}",
                  owner=self.owner,
                  source="pool" if self.owner == "pool" else "node",
                  sampling_callback=self._sampling_callback,
                  accessible_roots=self._accessible_roots,
              )
              provider = await self.exit_stack.enter_async_context(provider)
              self.providers.append(provider)
              # Note: AggregatingResourceProvider automatically sees the new provider
              # since it references self.providers list
          
              return provider