Skip to content

MessageNode

Sub classes

Name Children Inherits
Agent
llmling_agent.agent.agent
Agent for AI-powered interaction with LLMling resources and tools.
    BaseTeam
    llmling_agent.delegation.base_team
    Base class for Team and TeamRun.

    Base classes

    Name Children Inherits
    MessageEmitter
    llmling_agent.messaging.messageemitter
    Base class for all message processing nodes.
    Generic
    typing
    Abstract base class for generic types.

    ⋔ Inheritance diagram

    graph TD
      94890204255632["messagenode.MessageNode"]
      94890199949904["messageemitter.MessageEmitter"]
      94890161476128["abc.ABC"]
      139970493684192["builtins.object"]
      94890161081088["typing.Generic"]
      94890199949904 --> 94890204255632
      94890161476128 --> 94890199949904
      139970493684192 --> 94890161476128
      94890161081088 --> 94890199949904
      139970493684192 --> 94890161081088
      94890161081088 --> 94890204255632

    🛈 DocStrings

    Bases: MessageEmitter[TDeps, TResult]

    Base class for all message processing nodes.

    Source code in src/llmling_agent/messaging/messagenode.py
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    class MessageNode[TDeps, TResult](MessageEmitter[TDeps, TResult]):
        """Base class for all message processing nodes."""
    
        tool_used = Signal(ToolCallInfo)
        """Signal emitted when node uses a tool."""
    
        async def pre_run(
            self,
            *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
        ) -> tuple[ChatMessage[Any], list[Content | str]]:
            """Hook to prepare a MessgeNode run call.
    
            Args:
                *prompt: The prompt(s) to prepare.
    
            Returns:
                A tuple of:
                    - Either incoming message, or a constructed incoming message based
                      on the prompt(s).
                    - A list of prompts to be sent to the model.
            """
            if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                user_msg = prompt[0]
                prompts = await convert_prompts([user_msg.content])
                # Update received message's chain to show it came through its source
                user_msg = user_msg.forwarded(prompt[0]).to_request()
                # clear cost info to avoid double-counting
                final_prompt = "\n\n".join(str(p) for p in prompts)
            else:
                prompts = await convert_prompts(prompt)
                final_prompt = "\n\n".join(str(p) for p in prompts)
                # use format_prompts?
                user_msg = ChatMessage[str](
                    content=final_prompt,
                    role="user",
                    conversation_id=str(uuid4()),
                    parts=[
                        UserPromptPart(content=[content_to_pydantic_ai(i) for i in prompts])
                    ],
                )
            self.message_received.emit(user_msg)
            self.context.current_prompt = final_prompt
            return user_msg, prompts
    
        # async def post_run(
        #     self,
        #     message: ChatMessage[TResult],
        #     previous_message: ChatMessage[Any] | None,
        #     wait_for_connections: bool | None = None,
        # ) -> ChatMessage[Any]:
        #     # For chain processing, update the response's chain
        #     if previous_message:
        #         message = message.forwarded(previous_message)
        #         conversation_id = previous_message.conversation_id
        #     else:
        #         conversation_id = str(uuid4())
        #     # Set conversation_id on response message
        #     message = replace(message, conversation_id=conversation_id)
        #     self.message_sent.emit(message)
        #     await self.log_message(response_msg)
        #     await self.connections.route_message(message, wait=wait_for_connections)
        #     return message
    
        # @overload
        # async def run(
        #     self,
        #     *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
        #     wait_for_connections: bool | None = None,
        #     store_history: bool = True,
        #     output_type: None,
        #     **kwargs: Any,
        # ) -> ChatMessage[TResult]: ...
    
        # @overload
        # async def run[OutputTypeT](
        #     self,
        #     *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
        #     wait_for_connections: bool | None = None,
        #     store_history: bool = True,
        #     output_type: type[OutputTypeT],
        #     **kwargs: Any,
        # ) -> ChatMessage[OutputTypeT]: ...
    
        @method_spawner
        async def run[OutputTypeT](
            self,
            *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
            wait_for_connections: bool | None = None,
            store_history: bool = True,
            output_type: type[OutputTypeT] | None = None,
            **kwargs: Any,
        ) -> ChatMessage[Any]:
            """Execute node with prompts and handle message routing.
    
            Args:
                prompt: Input prompts
                wait_for_connections: Whether to wait for forwarded messages
                store_history: Whether to store in conversation history
                output_type: Type of output to expect
                **kwargs: Additional arguments for _run
            """
            from llmling_agent import Agent
    
            user_msg, prompts = await self.pre_run(*prompt)
            message = await self._run(
                *prompts,
                store_history=store_history,
                conversation_id=user_msg.conversation_id,
                output_type=output_type,
                **kwargs,
            )
    
            # For chain processing, update the response's chain
            if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                message = message.forwarded(prompt[0])
    
            if store_history and isinstance(self, Agent):
                self.conversation.add_chat_messages([user_msg, message])
            self.message_sent.emit(message)
            await self.connections.route_message(message, wait=wait_for_connections)
            return message
    
        @abstractmethod
        async def get_stats(self) -> MessageStats | AggregatedMessageStats:
            """Get message statistics for this node."""
    
        @abstractmethod
        def run_iter(
            self,
            *prompts: Any,
            **kwargs: Any,
        ) -> AsyncIterator[ChatMessage[Any]]:
            """Yield messages during execution."""
    

    tool_used class-attribute instance-attribute

    tool_used = Signal(ToolCallInfo)
    

    Signal emitted when node uses a tool.

    get_stats abstractmethod async

    Get message statistics for this node.

    Source code in src/llmling_agent/messaging/messagenode.py
    153
    154
    155
    @abstractmethod
    async def get_stats(self) -> MessageStats | AggregatedMessageStats:
        """Get message statistics for this node."""
    

    pre_run async

    pre_run(
        *prompt: AnyPromptType | Image | PathLike[str] | ChatMessage,
    ) -> tuple[ChatMessage[Any], list[Content | str]]
    

    Hook to prepare a MessgeNode run call.

    Parameters:

    Name Type Description Default
    *prompt AnyPromptType | Image | PathLike[str] | ChatMessage

    The prompt(s) to prepare.

    ()

    Returns:

    Type Description
    tuple[ChatMessage[Any], list[Content | str]]

    A tuple of: - Either incoming message, or a constructed incoming message based on the prompt(s). - A list of prompts to be sent to the model.

    Source code in src/llmling_agent/messaging/messagenode.py
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    async def pre_run(
        self,
        *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
    ) -> tuple[ChatMessage[Any], list[Content | str]]:
        """Hook to prepare a MessgeNode run call.
    
        Args:
            *prompt: The prompt(s) to prepare.
    
        Returns:
            A tuple of:
                - Either incoming message, or a constructed incoming message based
                  on the prompt(s).
                - A list of prompts to be sent to the model.
        """
        if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
            user_msg = prompt[0]
            prompts = await convert_prompts([user_msg.content])
            # Update received message's chain to show it came through its source
            user_msg = user_msg.forwarded(prompt[0]).to_request()
            # clear cost info to avoid double-counting
            final_prompt = "\n\n".join(str(p) for p in prompts)
        else:
            prompts = await convert_prompts(prompt)
            final_prompt = "\n\n".join(str(p) for p in prompts)
            # use format_prompts?
            user_msg = ChatMessage[str](
                content=final_prompt,
                role="user",
                conversation_id=str(uuid4()),
                parts=[
                    UserPromptPart(content=[content_to_pydantic_ai(i) for i in prompts])
                ],
            )
        self.message_received.emit(user_msg)
        self.context.current_prompt = final_prompt
        return user_msg, prompts
    

    run async

    run(
        *prompt: AnyPromptType | Image | PathLike[str] | ChatMessage,
        wait_for_connections: bool | None = None,
        store_history: bool = True,
        output_type: type[OutputTypeT] | None = None,
        **kwargs: Any,
    ) -> ChatMessage[Any]
    

    Execute node with prompts and handle message routing.

    Parameters:

    Name Type Description Default
    prompt AnyPromptType | Image | PathLike[str] | ChatMessage

    Input prompts

    ()
    wait_for_connections bool | None

    Whether to wait for forwarded messages

    None
    store_history bool

    Whether to store in conversation history

    True
    output_type type[OutputTypeT] | None

    Type of output to expect

    None
    **kwargs Any

    Additional arguments for _run

    {}
    Source code in src/llmling_agent/messaging/messagenode.py
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    @method_spawner
    async def run[OutputTypeT](
        self,
        *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
        wait_for_connections: bool | None = None,
        store_history: bool = True,
        output_type: type[OutputTypeT] | None = None,
        **kwargs: Any,
    ) -> ChatMessage[Any]:
        """Execute node with prompts and handle message routing.
    
        Args:
            prompt: Input prompts
            wait_for_connections: Whether to wait for forwarded messages
            store_history: Whether to store in conversation history
            output_type: Type of output to expect
            **kwargs: Additional arguments for _run
        """
        from llmling_agent import Agent
    
        user_msg, prompts = await self.pre_run(*prompt)
        message = await self._run(
            *prompts,
            store_history=store_history,
            conversation_id=user_msg.conversation_id,
            output_type=output_type,
            **kwargs,
        )
    
        # For chain processing, update the response's chain
        if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
            message = message.forwarded(prompt[0])
    
        if store_history and isinstance(self, Agent):
            self.conversation.add_chat_messages([user_msg, message])
        self.message_sent.emit(message)
        await self.connections.route_message(message, wait=wait_for_connections)
        return message
    

    run_iter abstractmethod

    run_iter(*prompts: Any, **kwargs: Any) -> AsyncIterator[ChatMessage[Any]]
    

    Yield messages during execution.

    Source code in src/llmling_agent/messaging/messagenode.py
    157
    158
    159
    160
    161
    162
    163
    @abstractmethod
    def run_iter(
        self,
        *prompts: Any,
        **kwargs: Any,
    ) -> AsyncIterator[ChatMessage[Any]]:
        """Yield messages during execution."""
    

    Show source on GitHub