Skip to content

message_handler

Class info

Classes

Name Children Inherits
MCPMessageHandler
llmling_agent.mcp_server.message_handler
Custom message handler that bridges FastMCP to llmling-agent notifications.

    🛈 DocStrings

    FastMCP message handler for llmling-agent.

    MCPMessageHandler

    Custom message handler that bridges FastMCP to llmling-agent notifications.

    Source code in src/llmling_agent/mcp_server/message_handler.py
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    class MCPMessageHandler:
        """Custom message handler that bridges FastMCP to llmling-agent notifications."""
    
        def __init__(self, client: MCPClient) -> None:
            self.client = client
    
        async def __call__(
            self,
            message: RequestResponder[mcp.types.ServerRequest, mcp.types.ClientResult]
            | mcp.types.ServerNotification
            | Exception,
        ) -> None:
            """Handle FastMCP messages by dispatching to appropriate handlers."""
            return await self.dispatch(message)
    
        async def dispatch(
            self,
            message: RequestResponder[mcp.types.ServerRequest, mcp.types.ClientResult]
            | mcp.types.ServerNotification
            | Exception,
        ) -> None:
            """Main dispatch method called by FastMCP."""
            # Handle all messages
            await self.on_message(message)
    
            # Import at runtime to avoid circular imports and lazy load FastMCP
            from mcp.shared.session import RequestResponder
            import mcp.types
    
            match message:
                # requests
                case RequestResponder():
                    await self.on_request(message)
                    # Handle specific requests
                    match message.request.root:
                        case mcp.types.PingRequest():
                            await self.on_ping(message.request.root)
                        case mcp.types.ListRootsRequest():
                            await self.on_list_roots(message.request.root)
                        case mcp.types.CreateMessageRequest():
                            await self.on_create_message(message.request.root)
    
                # notifications
                case mcp.types.ServerNotification():
                    await self.on_notification(message)
                    # Handle specific notifications
                    match message.root:
                        case mcp.types.CancelledNotification():
                            await self.on_cancelled(message.root)
                        case mcp.types.ProgressNotification():
                            await self.on_progress(message.root)
                        case mcp.types.LoggingMessageNotification():
                            await self.on_logging_message(message.root)
                        case mcp.types.ToolListChangedNotification():
                            await self.on_tool_list_changed(message.root)
                        case mcp.types.ResourceListChangedNotification():
                            await self.on_resource_list_changed(message.root)
                        case mcp.types.PromptListChangedNotification():
                            await self.on_prompt_list_changed(message.root)
                        case mcp.types.ResourceUpdatedNotification():
                            await self.on_resource_updated(message.root)
    
                case Exception():
                    await self.on_exception(message)
    
        async def on_message(
            self,
            message: RequestResponder[mcp.types.ServerRequest, mcp.types.ClientResult]
            | mcp.types.ServerNotification
            | Exception,
        ) -> None:
            """Handle generic messages."""
    
        async def on_request(
            self, message: RequestResponder[mcp.types.ServerRequest, mcp.types.ClientResult]
        ) -> None:
            """Handle requests."""
    
        async def on_notification(self, message: mcp.types.ServerNotification) -> None:
            """Handle server notifications."""
    
        async def on_tool_list_changed(
            self, message: mcp.types.ToolListChangedNotification
        ) -> None:
            """Handle tool list changes by refreshing tools."""
            logger.info("MCP tool list changed: %s", message)
            # Schedule async refresh - use create_task to avoid blocking
            task = asyncio.create_task(self.client._refresh_tools())
            # Store reference to avoid warning about unawaited task
            task.add_done_callback(
                lambda t: t.exception() if t.done() and t.exception() else None
            )
    
        async def on_resource_list_changed(
            self, message: mcp.types.ResourceListChangedNotification
        ) -> None:
            """Handle resource list changes."""
            logger.info("MCP resource list changed: %s", message)
    
        async def on_resource_updated(
            self, message: mcp.types.ResourceUpdatedNotification
        ) -> None:
            """Handle resource updates."""
            # ResourceUpdatedNotification has uri directly, not in params
            logger.info("MCP resource updated: %s", getattr(message, "uri", "unknown"))
    
        async def on_progress(self, message: mcp.types.ProgressNotification) -> None:
            """Handle progress notifications with proper context."""
            if self.client._progress_handler:
                # Use stored tool execution context - handle both coroutines and awaitables
                try:
                    # ProgressNotification has params attribute containing the data
                    params = getattr(message, "params", message)
                    awaitable = self.client._progress_handler(
                        self.client._current_tool_name or "",
                        self.client._current_tool_call_id or "",
                        self.client._current_tool_input or {},
                        getattr(params, "progress", 0.0),
                        getattr(params, "total", None),
                        getattr(params, "message", None),
                    )
                    # Use ensure_future to handle both coroutines and awaitables
                    task = asyncio.ensure_future(awaitable)
                    # Store reference to avoid warning about unawaited task
                    task.add_done_callback(
                        lambda t: t.exception() if t.done() and t.exception() else None
                    )
                except Exception:
                    logger.exception("Failed to handle progress notification")
    
        async def on_prompt_list_changed(
            self, message: mcp.types.PromptListChangedNotification
        ) -> None:
            """Handle prompt list changes."""
            logger.info("MCP prompt list changed: %s", message)
    
        async def on_cancelled(self, message: mcp.types.CancelledNotification) -> None:
            """Handle cancelled operations."""
            logger.info("MCP operation cancelled: %s", message)
    
        async def on_logging_message(
            self, message: mcp.types.LoggingMessageNotification
        ) -> None:
            """Handle server log messages."""
            # This is handled by _log_handler, but keep for completeness
    
        async def on_exception(self, message: Exception) -> None:
            """Handle exceptions."""
            logger.error("MCP client exception: %s", message)
    
        async def on_ping(self, message: mcp.types.PingRequest) -> None:
            """Handle ping requests."""
    
        async def on_list_roots(self, message: mcp.types.ListRootsRequest) -> None:
            """Handle list roots requests."""
    
        async def on_create_message(self, message: mcp.types.CreateMessageRequest) -> None:
            """Handle create message requests."""
    

    __call__ async

    __call__(
        message: RequestResponder[ServerRequest, ClientResult]
        | ServerNotification
        | Exception,
    ) -> None
    

    Handle FastMCP messages by dispatching to appropriate handlers.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    26
    27
    28
    29
    30
    31
    32
    33
    async def __call__(
        self,
        message: RequestResponder[mcp.types.ServerRequest, mcp.types.ClientResult]
        | mcp.types.ServerNotification
        | Exception,
    ) -> None:
        """Handle FastMCP messages by dispatching to appropriate handlers."""
        return await self.dispatch(message)
    

    dispatch async

    dispatch(
        message: RequestResponder[ServerRequest, ClientResult]
        | ServerNotification
        | Exception,
    ) -> None
    

    Main dispatch method called by FastMCP.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    async def dispatch(
        self,
        message: RequestResponder[mcp.types.ServerRequest, mcp.types.ClientResult]
        | mcp.types.ServerNotification
        | Exception,
    ) -> None:
        """Main dispatch method called by FastMCP."""
        # Handle all messages
        await self.on_message(message)
    
        # Import at runtime to avoid circular imports and lazy load FastMCP
        from mcp.shared.session import RequestResponder
        import mcp.types
    
        match message:
            # requests
            case RequestResponder():
                await self.on_request(message)
                # Handle specific requests
                match message.request.root:
                    case mcp.types.PingRequest():
                        await self.on_ping(message.request.root)
                    case mcp.types.ListRootsRequest():
                        await self.on_list_roots(message.request.root)
                    case mcp.types.CreateMessageRequest():
                        await self.on_create_message(message.request.root)
    
            # notifications
            case mcp.types.ServerNotification():
                await self.on_notification(message)
                # Handle specific notifications
                match message.root:
                    case mcp.types.CancelledNotification():
                        await self.on_cancelled(message.root)
                    case mcp.types.ProgressNotification():
                        await self.on_progress(message.root)
                    case mcp.types.LoggingMessageNotification():
                        await self.on_logging_message(message.root)
                    case mcp.types.ToolListChangedNotification():
                        await self.on_tool_list_changed(message.root)
                    case mcp.types.ResourceListChangedNotification():
                        await self.on_resource_list_changed(message.root)
                    case mcp.types.PromptListChangedNotification():
                        await self.on_prompt_list_changed(message.root)
                    case mcp.types.ResourceUpdatedNotification():
                        await self.on_resource_updated(message.root)
    
            case Exception():
                await self.on_exception(message)
    

    on_cancelled async

    on_cancelled(message: CancelledNotification) -> None
    

    Handle cancelled operations.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    156
    157
    158
    async def on_cancelled(self, message: mcp.types.CancelledNotification) -> None:
        """Handle cancelled operations."""
        logger.info("MCP operation cancelled: %s", message)
    

    on_create_message async

    on_create_message(message: CreateMessageRequest) -> None
    

    Handle create message requests.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    176
    177
    async def on_create_message(self, message: mcp.types.CreateMessageRequest) -> None:
        """Handle create message requests."""
    

    on_exception async

    on_exception(message: Exception) -> None
    

    Handle exceptions.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    166
    167
    168
    async def on_exception(self, message: Exception) -> None:
        """Handle exceptions."""
        logger.error("MCP client exception: %s", message)
    

    on_list_roots async

    on_list_roots(message: ListRootsRequest) -> None
    

    Handle list roots requests.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    173
    174
    async def on_list_roots(self, message: mcp.types.ListRootsRequest) -> None:
        """Handle list roots requests."""
    

    on_logging_message async

    on_logging_message(message: LoggingMessageNotification) -> None
    

    Handle server log messages.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    160
    161
    162
    163
    async def on_logging_message(
        self, message: mcp.types.LoggingMessageNotification
    ) -> None:
        """Handle server log messages."""
    

    on_message async

    on_message(
        message: RequestResponder[ServerRequest, ClientResult]
        | ServerNotification
        | Exception,
    ) -> None
    

    Handle generic messages.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    85
    86
    87
    88
    89
    90
    91
    async def on_message(
        self,
        message: RequestResponder[mcp.types.ServerRequest, mcp.types.ClientResult]
        | mcp.types.ServerNotification
        | Exception,
    ) -> None:
        """Handle generic messages."""
    

    on_notification async

    on_notification(message: ServerNotification) -> None
    

    Handle server notifications.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    98
    99
    async def on_notification(self, message: mcp.types.ServerNotification) -> None:
        """Handle server notifications."""
    

    on_ping async

    on_ping(message: PingRequest) -> None
    

    Handle ping requests.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    170
    171
    async def on_ping(self, message: mcp.types.PingRequest) -> None:
        """Handle ping requests."""
    

    on_progress async

    on_progress(message: ProgressNotification) -> None
    

    Handle progress notifications with proper context.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    async def on_progress(self, message: mcp.types.ProgressNotification) -> None:
        """Handle progress notifications with proper context."""
        if self.client._progress_handler:
            # Use stored tool execution context - handle both coroutines and awaitables
            try:
                # ProgressNotification has params attribute containing the data
                params = getattr(message, "params", message)
                awaitable = self.client._progress_handler(
                    self.client._current_tool_name or "",
                    self.client._current_tool_call_id or "",
                    self.client._current_tool_input or {},
                    getattr(params, "progress", 0.0),
                    getattr(params, "total", None),
                    getattr(params, "message", None),
                )
                # Use ensure_future to handle both coroutines and awaitables
                task = asyncio.ensure_future(awaitable)
                # Store reference to avoid warning about unawaited task
                task.add_done_callback(
                    lambda t: t.exception() if t.done() and t.exception() else None
                )
            except Exception:
                logger.exception("Failed to handle progress notification")
    

    on_prompt_list_changed async

    on_prompt_list_changed(message: PromptListChangedNotification) -> None
    

    Handle prompt list changes.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    150
    151
    152
    153
    154
    async def on_prompt_list_changed(
        self, message: mcp.types.PromptListChangedNotification
    ) -> None:
        """Handle prompt list changes."""
        logger.info("MCP prompt list changed: %s", message)
    

    on_request async

    on_request(message: RequestResponder[ServerRequest, ClientResult]) -> None
    

    Handle requests.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    93
    94
    95
    96
    async def on_request(
        self, message: RequestResponder[mcp.types.ServerRequest, mcp.types.ClientResult]
    ) -> None:
        """Handle requests."""
    

    on_resource_list_changed async

    on_resource_list_changed(message: ResourceListChangedNotification) -> None
    

    Handle resource list changes.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    113
    114
    115
    116
    117
    async def on_resource_list_changed(
        self, message: mcp.types.ResourceListChangedNotification
    ) -> None:
        """Handle resource list changes."""
        logger.info("MCP resource list changed: %s", message)
    

    on_resource_updated async

    on_resource_updated(message: ResourceUpdatedNotification) -> None
    

    Handle resource updates.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    119
    120
    121
    122
    123
    124
    async def on_resource_updated(
        self, message: mcp.types.ResourceUpdatedNotification
    ) -> None:
        """Handle resource updates."""
        # ResourceUpdatedNotification has uri directly, not in params
        logger.info("MCP resource updated: %s", getattr(message, "uri", "unknown"))
    

    on_tool_list_changed async

    on_tool_list_changed(message: ToolListChangedNotification) -> None
    

    Handle tool list changes by refreshing tools.

    Source code in src/llmling_agent/mcp_server/message_handler.py
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    async def on_tool_list_changed(
        self, message: mcp.types.ToolListChangedNotification
    ) -> None:
        """Handle tool list changes by refreshing tools."""
        logger.info("MCP tool list changed: %s", message)
        # Schedule async refresh - use create_task to avoid blocking
        task = asyncio.create_task(self.client._refresh_tools())
        # Store reference to avoid warning about unawaited task
        task.add_done_callback(
            lambda t: t.exception() if t.done() and t.exception() else None
        )