Skip to content

messagenode

Class info

Classes

Name Children Inherits
ChatMessage
llmling_agent.messaging.messages
Common message format for all UI types.
    MessageEmitter
    llmling_agent.messaging.messageemitter
    Base class for all message processing nodes.
    MessageNode
    llmling_agent.messaging.messagenode
    Base class for all message processing nodes.
    ToolCallInfo
    llmling_agent.models.tools
    Information about an executed tool call.

      🛈 DocStrings

      Base class for message processing nodes.

      MessageNode

      Bases: MessageEmitter[TDeps, TResult]

      Base class for all message processing nodes.

      Source code in src/llmling_agent/messaging/messagenode.py
       28
       29
       30
       31
       32
       33
       34
       35
       36
       37
       38
       39
       40
       41
       42
       43
       44
       45
       46
       47
       48
       49
       50
       51
       52
       53
       54
       55
       56
       57
       58
       59
       60
       61
       62
       63
       64
       65
       66
       67
       68
       69
       70
       71
       72
       73
       74
       75
       76
       77
       78
       79
       80
       81
       82
       83
       84
       85
       86
       87
       88
       89
       90
       91
       92
       93
       94
       95
       96
       97
       98
       99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      class MessageNode[TDeps, TResult](MessageEmitter[TDeps, TResult]):
          """Base class for all message processing nodes."""
      
          tool_used = Signal(ToolCallInfo)
          """Signal emitted when node uses a tool."""
      
          async def pre_run(
              self,
              *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
          ) -> tuple[ChatMessage[Any], list[Content | str]]:
              """Hook to prepare a MessgeNode run call.
      
              Args:
                  *prompt: The prompt(s) to prepare.
      
              Returns:
                  A tuple of:
                      - Either incoming message, or a constructed incoming message based
                        on the prompt(s).
                      - A list of prompts to be sent to the model.
              """
              if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                  user_msg = prompt[0]
                  prompts = await convert_prompts([user_msg.content])
                  # Update received message's chain to show it came through its source
                  user_msg = user_msg.forwarded(prompt[0])
                  # clear cost info to avoid double-counting
                  user_msg = replace(user_msg, role="user", cost_info=None)
                  final_prompt = "\n\n".join(str(p) for p in prompts)
              else:
                  prompts = await convert_prompts(prompt)
                  final_prompt = "\n\n".join(str(p) for p in prompts)
                  # use format_prompts?
                  user_msg = ChatMessage[str](
                      content=final_prompt,
                      role="user",
                      conversation_id=str(uuid4()),
                  )
              self.message_received.emit(user_msg)
              self.context.current_prompt = final_prompt
              return user_msg, prompts
      
          # async def post_run(
          #     self,
          #     message: ChatMessage[TResult],
          #     previous_message: ChatMessage[Any] | None,
          #     wait_for_connections: bool | None = None,
          # ) -> ChatMessage[Any]:
          #     # For chain processing, update the response's chain
          #     if previous_message:
          #         message = message.forwarded(previous_message)
          #         conversation_id = previous_message.conversation_id
          #     else:
          #         conversation_id = str(uuid4())
          #     # Set conversation_id on response message
          #     message = replace(message, conversation_id=conversation_id)
          #     self.message_sent.emit(message)
          #     await self.connections.route_message(message, wait=wait_for_connections)
          #     return message
      
          async def run(
              self,
              *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
              wait_for_connections: bool | None = None,
              store_history: bool = True,
              **kwargs: Any,
          ) -> ChatMessage[TResult]:
              """Execute node with prompts and handle message routing.
      
              Args:
                  prompt: Input prompts
                  wait_for_connections: Whether to wait for forwarded messages
                  store_history: Whether to store in conversation history
                  **kwargs: Additional arguments for _run
              """
              from llmling_agent import Agent, StructuredAgent
      
              user_msg, prompts = await self.pre_run(*prompt)
              message = await self._run(
                  *prompts,
                  store_history=store_history,
                  conversation_id=user_msg.conversation_id,
                  **kwargs,
              )
      
              # For chain processing, update the response's chain
              if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                  message = message.forwarded(prompt[0])
      
              if store_history and isinstance(self, Agent | StructuredAgent):
                  self.conversation.add_chat_messages([user_msg, message])
              self.message_sent.emit(message)
              await self.connections.route_message(message, wait=wait_for_connections)
              return message
      
          @abstractmethod
          def run_iter(
              self,
              *prompts: Any,
              **kwargs: Any,
          ) -> AsyncIterator[ChatMessage[Any]]:
              """Yield messages during execution."""
      

      tool_used class-attribute instance-attribute

      tool_used = Signal(ToolCallInfo)
      

      Signal emitted when node uses a tool.

      pre_run async

      pre_run(
          *prompt: AnyPromptType | Image | PathLike[str] | ChatMessage,
      ) -> tuple[ChatMessage[Any], list[Content | str]]
      

      Hook to prepare a MessgeNode run call.

      Parameters:

      Name Type Description Default
      *prompt AnyPromptType | Image | PathLike[str] | ChatMessage

      The prompt(s) to prepare.

      ()

      Returns:

      Type Description
      tuple[ChatMessage[Any], list[Content | str]]

      A tuple of: - Either incoming message, or a constructed incoming message based on the prompt(s). - A list of prompts to be sent to the model.

      Source code in src/llmling_agent/messaging/messagenode.py
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      async def pre_run(
          self,
          *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
      ) -> tuple[ChatMessage[Any], list[Content | str]]:
          """Hook to prepare a MessgeNode run call.
      
          Args:
              *prompt: The prompt(s) to prepare.
      
          Returns:
              A tuple of:
                  - Either incoming message, or a constructed incoming message based
                    on the prompt(s).
                  - A list of prompts to be sent to the model.
          """
          if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
              user_msg = prompt[0]
              prompts = await convert_prompts([user_msg.content])
              # Update received message's chain to show it came through its source
              user_msg = user_msg.forwarded(prompt[0])
              # clear cost info to avoid double-counting
              user_msg = replace(user_msg, role="user", cost_info=None)
              final_prompt = "\n\n".join(str(p) for p in prompts)
          else:
              prompts = await convert_prompts(prompt)
              final_prompt = "\n\n".join(str(p) for p in prompts)
              # use format_prompts?
              user_msg = ChatMessage[str](
                  content=final_prompt,
                  role="user",
                  conversation_id=str(uuid4()),
              )
          self.message_received.emit(user_msg)
          self.context.current_prompt = final_prompt
          return user_msg, prompts
      

      run async

      run(
          *prompt: AnyPromptType | Image | PathLike[str] | ChatMessage,
          wait_for_connections: bool | None = None,
          store_history: bool = True,
          **kwargs: Any,
      ) -> ChatMessage[TResult]
      

      Execute node with prompts and handle message routing.

      Parameters:

      Name Type Description Default
      prompt AnyPromptType | Image | PathLike[str] | ChatMessage

      Input prompts

      ()
      wait_for_connections bool | None

      Whether to wait for forwarded messages

      None
      store_history bool

      Whether to store in conversation history

      True
      **kwargs Any

      Additional arguments for _run

      {}
      Source code in src/llmling_agent/messaging/messagenode.py
       88
       89
       90
       91
       92
       93
       94
       95
       96
       97
       98
       99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      async def run(
          self,
          *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
          wait_for_connections: bool | None = None,
          store_history: bool = True,
          **kwargs: Any,
      ) -> ChatMessage[TResult]:
          """Execute node with prompts and handle message routing.
      
          Args:
              prompt: Input prompts
              wait_for_connections: Whether to wait for forwarded messages
              store_history: Whether to store in conversation history
              **kwargs: Additional arguments for _run
          """
          from llmling_agent import Agent, StructuredAgent
      
          user_msg, prompts = await self.pre_run(*prompt)
          message = await self._run(
              *prompts,
              store_history=store_history,
              conversation_id=user_msg.conversation_id,
              **kwargs,
          )
      
          # For chain processing, update the response's chain
          if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
              message = message.forwarded(prompt[0])
      
          if store_history and isinstance(self, Agent | StructuredAgent):
              self.conversation.add_chat_messages([user_msg, message])
          self.message_sent.emit(message)
          await self.connections.route_message(message, wait=wait_for_connections)
          return message
      

      run_iter abstractmethod

      run_iter(*prompts: Any, **kwargs: Any) -> AsyncIterator[ChatMessage[Any]]
      

      Yield messages during execution.

      Source code in src/llmling_agent/messaging/messagenode.py
      123
      124
      125
      126
      127
      128
      129
      @abstractmethod
      def run_iter(
          self,
          *prompts: Any,
          **kwargs: Any,
      ) -> AsyncIterator[ChatMessage[Any]]:
          """Yield messages during execution."""