Skip to content

message_handler

Class info

Classes

Name Children Inherits
MCPMessageHandler
llmling_agent.mcp_server.message_handler
Custom message handler that bridges FastMCP to llmling-agent notifications.

    🛈 DocStrings

    FastMCP message handler for llmling-agent.

    MCPMessageHandler

    Custom message handler that bridges FastMCP to llmling-agent notifications.

    Source code in src/llmling_agent/mcp_server/message_handler.py
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    class MCPMessageHandler:
        """Custom message handler that bridges FastMCP to llmling-agent notifications."""
    
        def __init__(self, client: MCPClient) -> None:
            self.client = client
    
        async def __call__(
            self,
            message: RequestResponder[mcp.types.ServerRequest, mcp.types.ClientResult]
            | mcp.types.ServerNotification
            | Exception,
        ) -> None:
            """Handle FastMCP messages by dispatching to appropriate handlers."""
            from mcp.shared.session import RequestResponder
            import mcp.types
    
            await self.on_message(message)
            match message:
                # requests
                case RequestResponder():
                    await self.on_request(message)
                    # Handle specific requests
                    match message.request.root:
                        case mcp.types.PingRequest():
                            await self.on_ping(message.request.root)
                        case mcp.types.ListRootsRequest():
                            await self.on_list_roots(message.request.root)
                        case mcp.types.CreateMessageRequest():
                            await self.on_create_message(message.request.root)
    
                # notifications
                case mcp.types.ServerNotification():
                    await self.on_notification(message)
                    # Handle specific notifications
                    match message.root:
                        case mcp.types.CancelledNotification():
                            await self.on_cancelled(message.root)
                        case mcp.types.ProgressNotification():
                            await self.on_progress(message.root)
                        case mcp.types.LoggingMessageNotification():
                            await self.on_logging_message(message.root)
                        case mcp.types.ToolListChangedNotification():
                            await self.on_tool_list_changed(message.root)
                        case mcp.types.ResourceListChangedNotification():
                            await self.on_resource_list_changed(message.root)
                        case mcp.types.PromptListChangedNotification():
                            await self.on_prompt_list_changed(message.root)
                        case mcp.types.ResourceUpdatedNotification():
                            await self.on_resource_updated(message.root)
    
                case Exception():
                    await self.on_exception(message)
    
        async def on_message(
            self,
            message: RequestResponder[mcp.types.ServerRequest, mcp.types.ClientResult]
            | mcp.types.ServerNotification
            | Exception,
        ) -> None:
            """Handle generic messages."""
    
        async def on_request(
            self, message: RequestResponder[mcp.types.ServerRequest, mcp.types.ClientResult]
        ) -> None:
            """Handle requests."""
    
        async def on_notification(self, message: mcp.types.ServerNotification) -> None:
            """Handle server notifications."""
    
        async def on_tool_list_changed(
            self, message: mcp.types.ToolListChangedNotification
        ) -> None:
            """Handle tool list changes by refreshing tools."""
            logger.info("MCP tool list changed", message=message)
            # Schedule async refresh - use create_task to avoid blocking
            task = asyncio.create_task(self.client._refresh_tools())
            # Store reference to avoid warning about unawaited task
            task.add_done_callback(
                lambda t: t.exception() if t.done() and t.exception() else None
            )
    
        async def on_resource_list_changed(
            self, message: mcp.types.ResourceListChangedNotification
        ) -> None:
            """Handle resource list changes."""
            logger.info("MCP resource list changed", message=message)
    
        async def on_resource_updated(
            self, message: mcp.types.ResourceUpdatedNotification
        ) -> None:
            """Handle resource updates."""
            # ResourceUpdatedNotification has uri directly, not in params
            logger.info("MCP resource updated", uri=getattr(message, "uri", "unknown"))
    
        async def on_progress(self, message: mcp.types.ProgressNotification) -> None:
            """Handle progress notifications with proper context."""
            # Note: Progress notifications from MCP servers are now handled per-tool-call
            # with the contextual progress handler, so global notifications are ignored
    
        async def on_prompt_list_changed(
            self, message: mcp.types.PromptListChangedNotification
        ) -> None:
            """Handle prompt list changes."""
            logger.info("MCP prompt list changed", message=message)
    
        async def on_cancelled(self, message: mcp.types.CancelledNotification) -> None:
            """Handle cancelled operations."""
            logger.info("MCP operation cancelled", message=message)
    
        async def on_logging_message(
            self, message: mcp.types.LoggingMessageNotification
        ) -> None:
            """Handle server log messages."""
            # This is handled by _log_handler, but keep for completeness
    
        async def on_exception(self, message: Exception) -> None:
            """Handle exceptions."""
            logger.error("MCP client exception", error=message)
    
        async def on_ping(self, message: mcp.types.PingRequest) -> None:
            """Handle ping requests."""
    
        async def on_list_roots(self, message: mcp.types.ListRootsRequest) -> None:
            """Handle list roots requests."""
    
        async def on_create_message(self, message: mcp.types.CreateMessageRequest) -> None:
            """Handle create message requests."""
    

    __call__ async

    __call__(
        message: RequestResponder[ServerRequest, ClientResult]
        | ServerNotification
        | Exception,
    ) -> None
    

    Handle FastMCP messages by dispatching to appropriate handlers.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    async def __call__(
        self,
        message: RequestResponder[mcp.types.ServerRequest, mcp.types.ClientResult]
        | mcp.types.ServerNotification
        | Exception,
    ) -> None:
        """Handle FastMCP messages by dispatching to appropriate handlers."""
        from mcp.shared.session import RequestResponder
        import mcp.types
    
        await self.on_message(message)
        match message:
            # requests
            case RequestResponder():
                await self.on_request(message)
                # Handle specific requests
                match message.request.root:
                    case mcp.types.PingRequest():
                        await self.on_ping(message.request.root)
                    case mcp.types.ListRootsRequest():
                        await self.on_list_roots(message.request.root)
                    case mcp.types.CreateMessageRequest():
                        await self.on_create_message(message.request.root)
    
            # notifications
            case mcp.types.ServerNotification():
                await self.on_notification(message)
                # Handle specific notifications
                match message.root:
                    case mcp.types.CancelledNotification():
                        await self.on_cancelled(message.root)
                    case mcp.types.ProgressNotification():
                        await self.on_progress(message.root)
                    case mcp.types.LoggingMessageNotification():
                        await self.on_logging_message(message.root)
                    case mcp.types.ToolListChangedNotification():
                        await self.on_tool_list_changed(message.root)
                    case mcp.types.ResourceListChangedNotification():
                        await self.on_resource_list_changed(message.root)
                    case mcp.types.PromptListChangedNotification():
                        await self.on_prompt_list_changed(message.root)
                    case mcp.types.ResourceUpdatedNotification():
                        await self.on_resource_updated(message.root)
    
            case Exception():
                await self.on_exception(message)
    

    on_cancelled async

    on_cancelled(message: CancelledNotification) -> None
    

    Handle cancelled operations.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    125
    126
    127
    async def on_cancelled(self, message: mcp.types.CancelledNotification) -> None:
        """Handle cancelled operations."""
        logger.info("MCP operation cancelled", message=message)
    

    on_create_message async

    on_create_message(message: CreateMessageRequest) -> None
    

    Handle create message requests.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    145
    146
    async def on_create_message(self, message: mcp.types.CreateMessageRequest) -> None:
        """Handle create message requests."""
    

    on_exception async

    on_exception(message: Exception) -> None
    

    Handle exceptions.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    135
    136
    137
    async def on_exception(self, message: Exception) -> None:
        """Handle exceptions."""
        logger.error("MCP client exception", error=message)
    

    on_list_roots async

    on_list_roots(message: ListRootsRequest) -> None
    

    Handle list roots requests.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    142
    143
    async def on_list_roots(self, message: mcp.types.ListRootsRequest) -> None:
        """Handle list roots requests."""
    

    on_logging_message async

    on_logging_message(message: LoggingMessageNotification) -> None
    

    Handle server log messages.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    129
    130
    131
    132
    async def on_logging_message(
        self, message: mcp.types.LoggingMessageNotification
    ) -> None:
        """Handle server log messages."""
    

    on_message async

    on_message(
        message: RequestResponder[ServerRequest, ClientResult]
        | ServerNotification
        | Exception,
    ) -> None
    

    Handle generic messages.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    73
    74
    75
    76
    77
    78
    79
    async def on_message(
        self,
        message: RequestResponder[mcp.types.ServerRequest, mcp.types.ClientResult]
        | mcp.types.ServerNotification
        | Exception,
    ) -> None:
        """Handle generic messages."""
    

    on_notification async

    on_notification(message: ServerNotification) -> None
    

    Handle server notifications.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    86
    87
    async def on_notification(self, message: mcp.types.ServerNotification) -> None:
        """Handle server notifications."""
    

    on_ping async

    on_ping(message: PingRequest) -> None
    

    Handle ping requests.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    139
    140
    async def on_ping(self, message: mcp.types.PingRequest) -> None:
        """Handle ping requests."""
    

    on_progress async

    on_progress(message: ProgressNotification) -> None
    

    Handle progress notifications with proper context.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    114
    115
    async def on_progress(self, message: mcp.types.ProgressNotification) -> None:
        """Handle progress notifications with proper context."""
    

    on_prompt_list_changed async

    on_prompt_list_changed(message: PromptListChangedNotification) -> None
    

    Handle prompt list changes.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    119
    120
    121
    122
    123
    async def on_prompt_list_changed(
        self, message: mcp.types.PromptListChangedNotification
    ) -> None:
        """Handle prompt list changes."""
        logger.info("MCP prompt list changed", message=message)
    

    on_request async

    on_request(message: RequestResponder[ServerRequest, ClientResult]) -> None
    

    Handle requests.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    81
    82
    83
    84
    async def on_request(
        self, message: RequestResponder[mcp.types.ServerRequest, mcp.types.ClientResult]
    ) -> None:
        """Handle requests."""
    

    on_resource_list_changed async

    on_resource_list_changed(message: ResourceListChangedNotification) -> None
    

    Handle resource list changes.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    101
    102
    103
    104
    105
    async def on_resource_list_changed(
        self, message: mcp.types.ResourceListChangedNotification
    ) -> None:
        """Handle resource list changes."""
        logger.info("MCP resource list changed", message=message)
    

    on_resource_updated async

    on_resource_updated(message: ResourceUpdatedNotification) -> None
    

    Handle resource updates.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    107
    108
    109
    110
    111
    112
    async def on_resource_updated(
        self, message: mcp.types.ResourceUpdatedNotification
    ) -> None:
        """Handle resource updates."""
        # ResourceUpdatedNotification has uri directly, not in params
        logger.info("MCP resource updated", uri=getattr(message, "uri", "unknown"))
    

    on_tool_list_changed async

    on_tool_list_changed(message: ToolListChangedNotification) -> None
    

    Handle tool list changes by refreshing tools.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    async def on_tool_list_changed(
        self, message: mcp.types.ToolListChangedNotification
    ) -> None:
        """Handle tool list changes by refreshing tools."""
        logger.info("MCP tool list changed", message=message)
        # Schedule async refresh - use create_task to avoid blocking
        task = asyncio.create_task(self.client._refresh_tools())
        # Store reference to avoid warning about unawaited task
        task.add_done_callback(
            lambda t: t.exception() if t.done() and t.exception() else None
        )