Skip to content

MessageNode

Sub classes

Name Children Inherits
Agent
llmling_agent.agent.agent
Agent for AI-powered interaction with LLMling resources and tools.
    StructuredAgent
    llmling_agent.agent.structured
    Wrapper for Agent that enforces a specific result type.
      BaseTeam
      llmling_agent.delegation.base_team
      Base class for Team and TeamRun.

      Base classes

      Name Children Inherits
      MessageEmitter
      llmling_agent.messaging.messageemitter
      Base class for all message processing nodes.
      Generic
      typing
      Abstract base class for generic types.

      ⋔ Inheritance diagram

      graph TD
        94111465455024["messagenode.MessageNode"]
        94111465799152["messageemitter.MessageEmitter"]
        94111462661808["tasks.TaskManagerMixin"]
        139887694254272["builtins.object"]
        94111414070624["abc.ABC"]
        94111413919344["typing.Generic"]
        94111465799152 --> 94111465455024
        94111462661808 --> 94111465799152
        139887694254272 --> 94111462661808
        94111414070624 --> 94111465799152
        139887694254272 --> 94111414070624
        94111413919344 --> 94111465799152
        139887694254272 --> 94111413919344
        94111413919344 --> 94111465455024

      🛈 DocStrings

      Bases: MessageEmitter[TDeps, TResult]

      Base class for all message processing nodes.

      Source code in src/llmling_agent/messaging/messagenode.py
       28
       29
       30
       31
       32
       33
       34
       35
       36
       37
       38
       39
       40
       41
       42
       43
       44
       45
       46
       47
       48
       49
       50
       51
       52
       53
       54
       55
       56
       57
       58
       59
       60
       61
       62
       63
       64
       65
       66
       67
       68
       69
       70
       71
       72
       73
       74
       75
       76
       77
       78
       79
       80
       81
       82
       83
       84
       85
       86
       87
       88
       89
       90
       91
       92
       93
       94
       95
       96
       97
       98
       99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      class MessageNode[TDeps, TResult](MessageEmitter[TDeps, TResult]):
          """Base class for all message processing nodes."""
      
          tool_used = Signal(ToolCallInfo)
          """Signal emitted when node uses a tool."""
      
          async def pre_run(
              self,
              *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
          ) -> tuple[ChatMessage[Any], list[Content | str]]:
              """Hook to prepare a MessgeNode run call.
      
              Args:
                  *prompt: The prompt(s) to prepare.
      
              Returns:
                  A tuple of:
                      - Either incoming message, or a constructed incoming message based
                        on the prompt(s).
                      - A list of prompts to be sent to the model.
              """
              if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                  user_msg = prompt[0]
                  prompts = await convert_prompts([user_msg.content])
                  # Update received message's chain to show it came through its source
                  user_msg = user_msg.forwarded(prompt[0])
                  # clear cost info to avoid double-counting
                  user_msg = replace(user_msg, role="user", cost_info=None)
                  final_prompt = "\n\n".join(str(p) for p in prompts)
              else:
                  prompts = await convert_prompts(prompt)
                  final_prompt = "\n\n".join(str(p) for p in prompts)
                  # use format_prompts?
                  user_msg = ChatMessage[str](
                      content=final_prompt,
                      role="user",
                      conversation_id=str(uuid4()),
                  )
              self.message_received.emit(user_msg)
              self.context.current_prompt = final_prompt
              return user_msg, prompts
      
          # async def post_run(
          #     self,
          #     message: ChatMessage[TResult],
          #     previous_message: ChatMessage[Any] | None,
          #     wait_for_connections: bool | None = None,
          # ) -> ChatMessage[Any]:
          #     # For chain processing, update the response's chain
          #     if previous_message:
          #         message = message.forwarded(previous_message)
          #         conversation_id = previous_message.conversation_id
          #     else:
          #         conversation_id = str(uuid4())
          #     # Set conversation_id on response message
          #     message = replace(message, conversation_id=conversation_id)
          #     self.message_sent.emit(message)
          #     await self.connections.route_message(message, wait=wait_for_connections)
          #     return message
      
          async def run(
              self,
              *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
              wait_for_connections: bool | None = None,
              store_history: bool = True,
              **kwargs: Any,
          ) -> ChatMessage[TResult]:
              """Execute node with prompts and handle message routing.
      
              Args:
                  prompt: Input prompts
                  wait_for_connections: Whether to wait for forwarded messages
                  store_history: Whether to store in conversation history
                  **kwargs: Additional arguments for _run
              """
              from llmling_agent import Agent, StructuredAgent
      
              user_msg, prompts = await self.pre_run(*prompt)
              message = await self._run(
                  *prompts,
                  store_history=store_history,
                  conversation_id=user_msg.conversation_id,
                  **kwargs,
              )
      
              # For chain processing, update the response's chain
              if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                  message = message.forwarded(prompt[0])
      
              if store_history and isinstance(self, Agent | StructuredAgent):
                  self.conversation.add_chat_messages([user_msg, message])
              self.message_sent.emit(message)
              await self.connections.route_message(message, wait=wait_for_connections)
              return message
      
          @abstractmethod
          def run_iter(
              self,
              *prompts: Any,
              **kwargs: Any,
          ) -> AsyncIterator[ChatMessage[Any]]:
              """Yield messages during execution."""
      

      tool_used class-attribute instance-attribute

      tool_used = Signal(ToolCallInfo)
      

      Signal emitted when node uses a tool.

      pre_run async

      pre_run(
          *prompt: AnyPromptType | Image | PathLike[str] | ChatMessage,
      ) -> tuple[ChatMessage[Any], list[Content | str]]
      

      Hook to prepare a MessgeNode run call.

      Parameters:

      Name Type Description Default
      *prompt AnyPromptType | Image | PathLike[str] | ChatMessage

      The prompt(s) to prepare.

      ()

      Returns:

      Type Description
      tuple[ChatMessage[Any], list[Content | str]]

      A tuple of: - Either incoming message, or a constructed incoming message based on the prompt(s). - A list of prompts to be sent to the model.

      Source code in src/llmling_agent/messaging/messagenode.py
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      async def pre_run(
          self,
          *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
      ) -> tuple[ChatMessage[Any], list[Content | str]]:
          """Hook to prepare a MessgeNode run call.
      
          Args:
              *prompt: The prompt(s) to prepare.
      
          Returns:
              A tuple of:
                  - Either incoming message, or a constructed incoming message based
                    on the prompt(s).
                  - A list of prompts to be sent to the model.
          """
          if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
              user_msg = prompt[0]
              prompts = await convert_prompts([user_msg.content])
              # Update received message's chain to show it came through its source
              user_msg = user_msg.forwarded(prompt[0])
              # clear cost info to avoid double-counting
              user_msg = replace(user_msg, role="user", cost_info=None)
              final_prompt = "\n\n".join(str(p) for p in prompts)
          else:
              prompts = await convert_prompts(prompt)
              final_prompt = "\n\n".join(str(p) for p in prompts)
              # use format_prompts?
              user_msg = ChatMessage[str](
                  content=final_prompt,
                  role="user",
                  conversation_id=str(uuid4()),
              )
          self.message_received.emit(user_msg)
          self.context.current_prompt = final_prompt
          return user_msg, prompts
      

      run async

      run(
          *prompt: AnyPromptType | Image | PathLike[str] | ChatMessage,
          wait_for_connections: bool | None = None,
          store_history: bool = True,
          **kwargs: Any,
      ) -> ChatMessage[TResult]
      

      Execute node with prompts and handle message routing.

      Parameters:

      Name Type Description Default
      prompt AnyPromptType | Image | PathLike[str] | ChatMessage

      Input prompts

      ()
      wait_for_connections bool | None

      Whether to wait for forwarded messages

      None
      store_history bool

      Whether to store in conversation history

      True
      **kwargs Any

      Additional arguments for _run

      {}
      Source code in src/llmling_agent/messaging/messagenode.py
       88
       89
       90
       91
       92
       93
       94
       95
       96
       97
       98
       99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      async def run(
          self,
          *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
          wait_for_connections: bool | None = None,
          store_history: bool = True,
          **kwargs: Any,
      ) -> ChatMessage[TResult]:
          """Execute node with prompts and handle message routing.
      
          Args:
              prompt: Input prompts
              wait_for_connections: Whether to wait for forwarded messages
              store_history: Whether to store in conversation history
              **kwargs: Additional arguments for _run
          """
          from llmling_agent import Agent, StructuredAgent
      
          user_msg, prompts = await self.pre_run(*prompt)
          message = await self._run(
              *prompts,
              store_history=store_history,
              conversation_id=user_msg.conversation_id,
              **kwargs,
          )
      
          # For chain processing, update the response's chain
          if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
              message = message.forwarded(prompt[0])
      
          if store_history and isinstance(self, Agent | StructuredAgent):
              self.conversation.add_chat_messages([user_msg, message])
          self.message_sent.emit(message)
          await self.connections.route_message(message, wait=wait_for_connections)
          return message
      

      run_iter abstractmethod

      run_iter(*prompts: Any, **kwargs: Any) -> AsyncIterator[ChatMessage[Any]]
      

      Yield messages during execution.

      Source code in src/llmling_agent/messaging/messagenode.py
      123
      124
      125
      126
      127
      128
      129
      @abstractmethod
      def run_iter(
          self,
          *prompts: Any,
          **kwargs: Any,
      ) -> AsyncIterator[ChatMessage[Any]]:
          """Yield messages during execution."""
      

      Show source on GitHub