Skip to content

message_handler

Class info

Classes

Name Children Inherits
MCPMessageHandler
llmling_agent.mcp_server.message_handler
Custom message handler that bridges FastMCP to llmling-agent notifications.

    🛈 DocStrings

    FastMCP message handler for llmling-agent.

    MCPMessageHandler dataclass

    Custom message handler that bridges FastMCP to llmling-agent notifications.

    Source code in src/llmling_agent/mcp_server/message_handler.py
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    @dataclass
    class MCPMessageHandler:
        """Custom message handler that bridges FastMCP to llmling-agent notifications."""
    
        client: MCPClient
        """The MCP client instance."""
        tool_change_callback: Callable[[], Awaitable[None]] | None = None
        """Tool change callback."""
        prompt_change_callback: Callable[[], Awaitable[None]] | None = None
        """Prompt change callback."""
        resource_change_callback: Callable[[], Awaitable[None]] | None = None
        """Resource change callback."""
    
        async def __call__(
            self,
            message: RequestResponder[mcp.types.ServerRequest, mcp.types.ClientResult]
            | mcp.types.ServerNotification
            | Exception,
        ) -> None:
            """Handle FastMCP messages by dispatching to appropriate handlers."""
            from mcp.shared.session import RequestResponder
            import mcp.types
    
            await self.on_message(message)
            match message:
                # requests
                case RequestResponder():
                    await self.on_request(message)
                    # Handle specific requests
                    match message.request.root:
                        case mcp.types.PingRequest():
                            await self.on_ping(message.request.root)
                        case mcp.types.ListRootsRequest():
                            await self.on_list_roots(message.request.root)
                        case mcp.types.CreateMessageRequest():
                            await self.on_create_message(message.request.root)
    
                case mcp.types.ServerNotification():
                    await self.on_notification(message)
                    match message.root:
                        case mcp.types.CancelledNotification():
                            await self.on_cancelled(message.root)
                        case mcp.types.ProgressNotification():
                            await self.on_progress(message.root)
                        case mcp.types.LoggingMessageNotification():
                            await self.on_logging_message(message.root)
                        case mcp.types.ToolListChangedNotification():
                            await self.on_tool_list_changed(message.root)
                        case mcp.types.ResourceListChangedNotification():
                            await self.on_resource_list_changed(message.root)
                        case mcp.types.PromptListChangedNotification():
                            await self.on_prompt_list_changed(message.root)
                        case mcp.types.ResourceUpdatedNotification():
                            await self.on_resource_updated(message.root)
    
                case Exception():
                    await self.on_exception(message)
    
        async def on_message(
            self,
            message: RequestResponder[mcp.types.ServerRequest, mcp.types.ClientResult]
            | mcp.types.ServerNotification
            | Exception,
        ) -> None:
            """Handle generic messages."""
    
        async def on_request(
            self, message: RequestResponder[mcp.types.ServerRequest, mcp.types.ClientResult]
        ) -> None:
            """Handle requests."""
    
        async def on_notification(self, message: mcp.types.ServerNotification) -> None:
            """Handle server notifications."""
    
        async def on_tool_list_changed(self, message: mcp.types.ToolListChangedNotification) -> None:
            """Handle tool list changes."""
            logger.info("MCP tool list changed", message=message)
            # Call the tool change callback if provided
            if self.tool_change_callback:
                await self.tool_change_callback()
    
        async def on_resource_list_changed(
            self, message: mcp.types.ResourceListChangedNotification
        ) -> None:
            """Handle resource list changes."""
            logger.info("MCP resource list changed", message=message)
            # Call the resource change callback if provided
            if self.resource_change_callback:
                await self.resource_change_callback()
    
        async def on_resource_updated(self, message: mcp.types.ResourceUpdatedNotification) -> None:
            """Handle resource updates."""
            # ResourceUpdatedNotification has uri directly, not in params
            logger.info("MCP resource updated", uri=getattr(message, "uri", "unknown"))
    
        async def on_progress(self, message: mcp.types.ProgressNotification) -> None:
            """Handle progress notifications with proper context."""
            # Note: Progress notifications from MCP servers are now handled per-tool-call
            # with the contextual progress handler, so global notifications are ignored
    
        async def on_prompt_list_changed(
            self, message: mcp.types.PromptListChangedNotification
        ) -> None:
            """Handle prompt list changes."""
            logger.info("MCP prompt list changed", message=message)
            # Call the prompt change callback if provided
            if self.prompt_change_callback:
                await self.prompt_change_callback()
    
        async def on_cancelled(self, message: mcp.types.CancelledNotification) -> None:
            """Handle cancelled operations."""
            logger.info("MCP operation cancelled", message=message)
    
        async def on_logging_message(self, message: mcp.types.LoggingMessageNotification) -> None:
            """Handle server log messages."""
            # This is handled by _log_handler, but keep for completeness
    
        async def on_exception(self, message: Exception) -> None:
            """Handle exceptions."""
            logger.error("MCP client exception", error=message)
    
        async def on_ping(self, message: mcp.types.PingRequest) -> None:
            """Handle ping requests."""
    
        async def on_list_roots(self, message: mcp.types.ListRootsRequest) -> None:
            """Handle list roots requests."""
    
        async def on_create_message(self, message: mcp.types.CreateMessageRequest) -> None:
            """Handle create message requests."""
    

    client instance-attribute

    client: MCPClient
    

    The MCP client instance.

    prompt_change_callback class-attribute instance-attribute

    prompt_change_callback: Callable[[], Awaitable[None]] | None = None
    

    Prompt change callback.

    resource_change_callback class-attribute instance-attribute

    resource_change_callback: Callable[[], Awaitable[None]] | None = None
    

    Resource change callback.

    tool_change_callback class-attribute instance-attribute

    tool_change_callback: Callable[[], Awaitable[None]] | None = None
    

    Tool change callback.

    __call__ async

    __call__(
        message: RequestResponder[ServerRequest, ClientResult] | ServerNotification | Exception,
    ) -> None
    

    Handle FastMCP messages by dispatching to appropriate handlers.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    async def __call__(
        self,
        message: RequestResponder[mcp.types.ServerRequest, mcp.types.ClientResult]
        | mcp.types.ServerNotification
        | Exception,
    ) -> None:
        """Handle FastMCP messages by dispatching to appropriate handlers."""
        from mcp.shared.session import RequestResponder
        import mcp.types
    
        await self.on_message(message)
        match message:
            # requests
            case RequestResponder():
                await self.on_request(message)
                # Handle specific requests
                match message.request.root:
                    case mcp.types.PingRequest():
                        await self.on_ping(message.request.root)
                    case mcp.types.ListRootsRequest():
                        await self.on_list_roots(message.request.root)
                    case mcp.types.CreateMessageRequest():
                        await self.on_create_message(message.request.root)
    
            case mcp.types.ServerNotification():
                await self.on_notification(message)
                match message.root:
                    case mcp.types.CancelledNotification():
                        await self.on_cancelled(message.root)
                    case mcp.types.ProgressNotification():
                        await self.on_progress(message.root)
                    case mcp.types.LoggingMessageNotification():
                        await self.on_logging_message(message.root)
                    case mcp.types.ToolListChangedNotification():
                        await self.on_tool_list_changed(message.root)
                    case mcp.types.ResourceListChangedNotification():
                        await self.on_resource_list_changed(message.root)
                    case mcp.types.PromptListChangedNotification():
                        await self.on_prompt_list_changed(message.root)
                    case mcp.types.ResourceUpdatedNotification():
                        await self.on_resource_updated(message.root)
    
            case Exception():
                await self.on_exception(message)
    

    on_cancelled async

    on_cancelled(message: CancelledNotification) -> None
    

    Handle cancelled operations.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    131
    132
    133
    async def on_cancelled(self, message: mcp.types.CancelledNotification) -> None:
        """Handle cancelled operations."""
        logger.info("MCP operation cancelled", message=message)
    

    on_create_message async

    on_create_message(message: CreateMessageRequest) -> None
    

    Handle create message requests.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    149
    150
    async def on_create_message(self, message: mcp.types.CreateMessageRequest) -> None:
        """Handle create message requests."""
    

    on_exception async

    on_exception(message: Exception) -> None
    

    Handle exceptions.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    139
    140
    141
    async def on_exception(self, message: Exception) -> None:
        """Handle exceptions."""
        logger.error("MCP client exception", error=message)
    

    on_list_roots async

    on_list_roots(message: ListRootsRequest) -> None
    

    Handle list roots requests.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    146
    147
    async def on_list_roots(self, message: mcp.types.ListRootsRequest) -> None:
        """Handle list roots requests."""
    

    on_logging_message async

    on_logging_message(message: LoggingMessageNotification) -> None
    

    Handle server log messages.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    135
    136
    async def on_logging_message(self, message: mcp.types.LoggingMessageNotification) -> None:
        """Handle server log messages."""
    

    on_message async

    on_message(
        message: RequestResponder[ServerRequest, ClientResult] | ServerNotification | Exception,
    ) -> None
    

    Handle generic messages.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    80
    81
    82
    83
    84
    85
    86
    async def on_message(
        self,
        message: RequestResponder[mcp.types.ServerRequest, mcp.types.ClientResult]
        | mcp.types.ServerNotification
        | Exception,
    ) -> None:
        """Handle generic messages."""
    

    on_notification async

    on_notification(message: ServerNotification) -> None
    

    Handle server notifications.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    93
    94
    async def on_notification(self, message: mcp.types.ServerNotification) -> None:
        """Handle server notifications."""
    

    on_ping async

    on_ping(message: PingRequest) -> None
    

    Handle ping requests.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    143
    144
    async def on_ping(self, message: mcp.types.PingRequest) -> None:
        """Handle ping requests."""
    

    on_progress async

    on_progress(message: ProgressNotification) -> None
    

    Handle progress notifications with proper context.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    117
    118
    async def on_progress(self, message: mcp.types.ProgressNotification) -> None:
        """Handle progress notifications with proper context."""
    

    on_prompt_list_changed async

    on_prompt_list_changed(message: PromptListChangedNotification) -> None
    

    Handle prompt list changes.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    122
    123
    124
    125
    126
    127
    128
    129
    async def on_prompt_list_changed(
        self, message: mcp.types.PromptListChangedNotification
    ) -> None:
        """Handle prompt list changes."""
        logger.info("MCP prompt list changed", message=message)
        # Call the prompt change callback if provided
        if self.prompt_change_callback:
            await self.prompt_change_callback()
    

    on_request async

    on_request(message: RequestResponder[ServerRequest, ClientResult]) -> None
    

    Handle requests.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    88
    89
    90
    91
    async def on_request(
        self, message: RequestResponder[mcp.types.ServerRequest, mcp.types.ClientResult]
    ) -> None:
        """Handle requests."""
    

    on_resource_list_changed async

    on_resource_list_changed(message: ResourceListChangedNotification) -> None
    

    Handle resource list changes.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    103
    104
    105
    106
    107
    108
    109
    110
    async def on_resource_list_changed(
        self, message: mcp.types.ResourceListChangedNotification
    ) -> None:
        """Handle resource list changes."""
        logger.info("MCP resource list changed", message=message)
        # Call the resource change callback if provided
        if self.resource_change_callback:
            await self.resource_change_callback()
    

    on_resource_updated async

    on_resource_updated(message: ResourceUpdatedNotification) -> None
    

    Handle resource updates.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    112
    113
    114
    115
    async def on_resource_updated(self, message: mcp.types.ResourceUpdatedNotification) -> None:
        """Handle resource updates."""
        # ResourceUpdatedNotification has uri directly, not in params
        logger.info("MCP resource updated", uri=getattr(message, "uri", "unknown"))
    

    on_tool_list_changed async

    on_tool_list_changed(message: ToolListChangedNotification) -> None
    

    Handle tool list changes.

    Source code in src/llmling_agent/mcp_server/message_handler.py
     96
     97
     98
     99
    100
    101
    async def on_tool_list_changed(self, message: mcp.types.ToolListChangedNotification) -> None:
        """Handle tool list changes."""
        logger.info("MCP tool list changed", message=message)
        # Call the tool change callback if provided
        if self.tool_change_callback:
            await self.tool_change_callback()