Skip to content

mcp_provider

Class info

Classes

Name Children Inherits
BaseMCPServerConfig
llmling_agent_config.mcp_server
Base model for MCP server configuration.
  • StdioMCPServerConfig
  • SSEMCPServerConfig
  • StreamableHTTPMCPServerConfig
MCPResourceProvider
llmling_agent.resource_providers.mcp_provider
Resource provider for a single MCP server.
    ResourceInfo
    llmling_agent_config.resources
    Information about an available resource.
      ResourceProvider
      llmling_agent.resource_providers.base
      Base class for resource providers.

      🛈 DocStrings

      Tool management for LLMling agents.

      MCPResourceProvider

      Bases: ResourceProvider

      Resource provider for a single MCP server.

      Source code in src/llmling_agent/resource_providers/mcp_provider.py
       32
       33
       34
       35
       36
       37
       38
       39
       40
       41
       42
       43
       44
       45
       46
       47
       48
       49
       50
       51
       52
       53
       54
       55
       56
       57
       58
       59
       60
       61
       62
       63
       64
       65
       66
       67
       68
       69
       70
       71
       72
       73
       74
       75
       76
       77
       78
       79
       80
       81
       82
       83
       84
       85
       86
       87
       88
       89
       90
       91
       92
       93
       94
       95
       96
       97
       98
       99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      154
      155
      156
      157
      158
      159
      160
      161
      162
      163
      164
      165
      166
      167
      168
      169
      170
      171
      172
      173
      174
      175
      176
      177
      178
      179
      180
      181
      182
      183
      184
      185
      186
      187
      188
      189
      190
      191
      192
      193
      194
      195
      196
      197
      198
      199
      200
      201
      202
      203
      204
      205
      206
      207
      208
      209
      210
      211
      212
      213
      214
      215
      216
      217
      218
      219
      220
      221
      222
      223
      224
      225
      226
      class MCPResourceProvider(ResourceProvider):
          """Resource provider for a single MCP server."""
      
          def __init__(
              self,
              server: MCPServerConfig | str,
              name: str = "mcp",
              owner: str | None = None,
              context: NodeContext | None = None,
              source: Literal["pool", "node"] = "node",
              elicitation_callback: ElicitationHandler | None = None,
              sampling_callback: ClientSamplingHandler | None = None,
              progress_handler: RichProgressCallback | None = None,
              accessible_roots: list[str] | None = None,
          ):
              from llmling_agent.mcp_server import MCPClient
      
              super().__init__(name, owner=owner)
              self.server = (
                  BaseMCPServerConfig.from_string(server) if isinstance(server, str) else server
              )
              self.context = context
              self.source = source
              self.exit_stack = AsyncExitStack()
              self._progress_handler = progress_handler
              self._accessible_roots = accessible_roots
              self._elicitation_callback = elicitation_callback
              self._sampling_callback = sampling_callback
      
              # Tool caching
              self._tools_cache: list[Tool] | None = None
              self._saved_enabled_states: dict[str, bool] = {}
      
              # Prompt caching
              self._prompts_cache: list[MCPClientPrompt] | None = None
      
              # Resource caching
              self._resources_cache: list[ResourceInfo] | None = None
      
              self.client = MCPClient(
                  config=self.server,
                  elicitation_callback=self._elicitation_callback,
                  sampling_callback=self._sampling_callback,
                  progress_handler=self._progress_handler,
                  accessible_roots=self._accessible_roots,
                  tool_change_callback=self._on_tools_changed,
                  prompt_change_callback=self._on_prompts_changed,
                  resource_change_callback=self._on_resources_changed,
              )
      
          def __repr__(self) -> str:
              return f"MCPResourceProvider({self.server!r}, source={self.source!r})"
      
          async def __aenter__(self) -> Self:
              try:
                  await self.exit_stack.enter_async_context(self.client)
              except Exception as e:
                  # Clean up in case of error
                  await self.__aexit__(type(e), e, e.__traceback__)
                  msg = "Failed to initialize MCP manager"
                  raise RuntimeError(msg) from e
      
              return self
      
          async def __aexit__(
              self,
              exc_type: type[BaseException] | None,
              exc_val: BaseException | None,
              exc_tb: TracebackType | None,
          ):
              try:
                  try:
                      # Clean up exit stack (which includes MCP clients)
                      await self.exit_stack.aclose()
                  except RuntimeError as e:
                      if "different task" in str(e):
                          # Handle task context mismatch
                          current_task = asyncio.current_task()
                          if current_task:
                              loop = asyncio.get_running_loop()
                              await loop.create_task(self.exit_stack.aclose())
                      else:
                          raise
      
              except Exception as e:
                  msg = "Error during MCP manager cleanup"
                  logger.exception(msg, exc_info=e)
                  raise RuntimeError(msg) from e
      
          async def _on_tools_changed(self) -> None:
              """Callback when tools change on the MCP server."""
              logger.info("MCP tool list changed, refreshing provider cache")
              self._saved_enabled_states = {t.name: t.enabled for t in self._tools_cache or []}
              self._tools_cache = None
      
          async def _on_prompts_changed(self) -> None:
              """Callback when prompts change on the MCP server."""
              logger.info("MCP prompt list changed, refreshing provider cache")
              self._prompts_cache = None
      
          async def _on_resources_changed(self) -> None:
              """Callback when resources change on the MCP server."""
              logger.info("MCP resource list changed, refreshing provider cache")
              self._resources_cache = None
      
          async def refresh_tools_cache(self) -> None:
              """Refresh the tools cache by fetching from client."""
              try:
                  # Get fresh tools from client
                  mcp_tools = await self.client.list_tools()
                  all_tools: list[Tool] = []
      
                  for tool in mcp_tools:
                      try:
                          tool_info = self.client.convert_tool(tool)
                          all_tools.append(tool_info)
                      except Exception:
                          logger.exception("Failed to create MCP tool", name=tool.name)
                          continue
      
                  # Restore enabled states from saved states
                  for tool_info in all_tools:
                      if tool_info.name in self._saved_enabled_states:
                          tool_info.enabled = self._saved_enabled_states[tool_info.name]
      
                  self._tools_cache = all_tools
                  logger.debug("Refreshed MCP tools cache", num_tools=len(all_tools))
              except Exception:
                  logger.exception("Failed to refresh MCP tools cache")
                  self._tools_cache = []
      
          async def get_tools(self) -> list[Tool]:
              """Get cached tools, refreshing if necessary."""
              if self._tools_cache is None:
                  await self.refresh_tools_cache()
      
              return self._tools_cache or []
      
          async def refresh_prompts_cache(self) -> None:
              """Refresh the prompts cache by fetching from client."""
              from llmling_agent.prompts.prompts import MCPClientPrompt
      
              try:
                  result = await self.client.list_prompts()
                  all_prompts: list[MCPClientPrompt] = []
      
                  for prompt in result:
                      try:
                          converted = MCPClientPrompt.from_fastmcp(self.client, prompt)
                          all_prompts.append(converted)
                      except Exception:
                          logger.exception("Failed to convert prompt", name=prompt.name)
                          continue
      
                  self._prompts_cache = all_prompts
                  logger.debug("Refreshed MCP prompts cache", num_prompts=len(all_prompts))
              except Exception:
                  logger.exception("Failed to refresh MCP prompts cache")
                  self._prompts_cache = []
      
          async def get_prompts(self) -> list[MCPClientPrompt]:  # type: ignore
              """Get cached prompts, refreshing if necessary."""
              if self._prompts_cache is None:
                  await self.refresh_prompts_cache()
      
              return self._prompts_cache or []
      
          async def refresh_resources_cache(self) -> None:
              """Refresh the resources cache by fetching from client."""
              try:
                  result = await self.client.list_resources()
                  all_resources: list[ResourceInfo] = []
      
                  for resource in result:
                      try:
                          converted = await ResourceInfo.from_mcp_resource(resource)
                          all_resources.append(converted)
                      except Exception:
                          logger.exception("Failed to convert resource", name=resource.name)
                          continue
      
                  self._resources_cache = all_resources
                  logger.debug(
                      "Refreshed MCP resources cache", num_resources=len(all_resources)
                  )
              except Exception:
                  logger.exception("Failed to refresh MCP resources cache")
                  self._resources_cache = []
      
          async def get_resources(self) -> list[ResourceInfo]:
              """Get cached resources, refreshing if necessary."""
              if self._resources_cache is None:
                  await self.refresh_resources_cache()
      
              return self._resources_cache or []
      

      get_prompts async

      get_prompts() -> list[MCPClientPrompt]
      

      Get cached prompts, refreshing if necessary.

      Source code in src/llmling_agent/resource_providers/mcp_provider.py
      192
      193
      194
      195
      196
      197
      async def get_prompts(self) -> list[MCPClientPrompt]:  # type: ignore
          """Get cached prompts, refreshing if necessary."""
          if self._prompts_cache is None:
              await self.refresh_prompts_cache()
      
          return self._prompts_cache or []
      

      get_resources async

      get_resources() -> list[ResourceInfo]
      

      Get cached resources, refreshing if necessary.

      Source code in src/llmling_agent/resource_providers/mcp_provider.py
      221
      222
      223
      224
      225
      226
      async def get_resources(self) -> list[ResourceInfo]:
          """Get cached resources, refreshing if necessary."""
          if self._resources_cache is None:
              await self.refresh_resources_cache()
      
          return self._resources_cache or []
      

      get_tools async

      get_tools() -> list[Tool]
      

      Get cached tools, refreshing if necessary.

      Source code in src/llmling_agent/resource_providers/mcp_provider.py
      163
      164
      165
      166
      167
      168
      async def get_tools(self) -> list[Tool]:
          """Get cached tools, refreshing if necessary."""
          if self._tools_cache is None:
              await self.refresh_tools_cache()
      
          return self._tools_cache or []
      

      refresh_prompts_cache async

      refresh_prompts_cache() -> None
      

      Refresh the prompts cache by fetching from client.

      Source code in src/llmling_agent/resource_providers/mcp_provider.py
      170
      171
      172
      173
      174
      175
      176
      177
      178
      179
      180
      181
      182
      183
      184
      185
      186
      187
      188
      189
      190
      async def refresh_prompts_cache(self) -> None:
          """Refresh the prompts cache by fetching from client."""
          from llmling_agent.prompts.prompts import MCPClientPrompt
      
          try:
              result = await self.client.list_prompts()
              all_prompts: list[MCPClientPrompt] = []
      
              for prompt in result:
                  try:
                      converted = MCPClientPrompt.from_fastmcp(self.client, prompt)
                      all_prompts.append(converted)
                  except Exception:
                      logger.exception("Failed to convert prompt", name=prompt.name)
                      continue
      
              self._prompts_cache = all_prompts
              logger.debug("Refreshed MCP prompts cache", num_prompts=len(all_prompts))
          except Exception:
              logger.exception("Failed to refresh MCP prompts cache")
              self._prompts_cache = []
      

      refresh_resources_cache async

      refresh_resources_cache() -> None
      

      Refresh the resources cache by fetching from client.

      Source code in src/llmling_agent/resource_providers/mcp_provider.py
      199
      200
      201
      202
      203
      204
      205
      206
      207
      208
      209
      210
      211
      212
      213
      214
      215
      216
      217
      218
      219
      async def refresh_resources_cache(self) -> None:
          """Refresh the resources cache by fetching from client."""
          try:
              result = await self.client.list_resources()
              all_resources: list[ResourceInfo] = []
      
              for resource in result:
                  try:
                      converted = await ResourceInfo.from_mcp_resource(resource)
                      all_resources.append(converted)
                  except Exception:
                      logger.exception("Failed to convert resource", name=resource.name)
                      continue
      
              self._resources_cache = all_resources
              logger.debug(
                  "Refreshed MCP resources cache", num_resources=len(all_resources)
              )
          except Exception:
              logger.exception("Failed to refresh MCP resources cache")
              self._resources_cache = []
      

      refresh_tools_cache async

      refresh_tools_cache() -> None
      

      Refresh the tools cache by fetching from client.

      Source code in src/llmling_agent/resource_providers/mcp_provider.py
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      154
      155
      156
      157
      158
      159
      160
      161
      async def refresh_tools_cache(self) -> None:
          """Refresh the tools cache by fetching from client."""
          try:
              # Get fresh tools from client
              mcp_tools = await self.client.list_tools()
              all_tools: list[Tool] = []
      
              for tool in mcp_tools:
                  try:
                      tool_info = self.client.convert_tool(tool)
                      all_tools.append(tool_info)
                  except Exception:
                      logger.exception("Failed to create MCP tool", name=tool.name)
                      continue
      
              # Restore enabled states from saved states
              for tool_info in all_tools:
                  if tool_info.name in self._saved_enabled_states:
                      tool_info.enabled = self._saved_enabled_states[tool_info.name]
      
              self._tools_cache = all_tools
              logger.debug("Refreshed MCP tools cache", num_tools=len(all_tools))
          except Exception:
              logger.exception("Failed to refresh MCP tools cache")
              self._tools_cache = []