Skip to content

messageemitter

Class info

Classes

Name Children Inherits
AggregatedMessageStats
llmling_agent.talk.stats
Statistics aggregated from multiple connections.
AggregatedTalkStats
llmling_agent.talk.stats
Statistics aggregated from multiple connections.
    ChatMessage
    llmling_agent.messaging.messages
    Common message format for all UI types.
      MCPManager
      llmling_agent.mcp_server.manager
      Manages MCP server connections and tools.
        MessageEmitter
        llmling_agent.messaging.messageemitter
        Base class for all message processing nodes.
        MessageStats
        llmling_agent.talk.stats
        Statistics for a single connection.
        TaskManagerMixin
        llmling_agent.utils.tasks
        Mixin for managing async tasks.
        ToolCallInfo
        llmling_agent.models.tools
        Information about an executed tool call.

          🛈 DocStrings

          Base class for message processing nodes.

          MessageEmitter

          Bases: TaskManagerMixin, ABC

          Base class for all message processing nodes.

          Source code in src/llmling_agent/messaging/messageemitter.py
           44
           45
           46
           47
           48
           49
           50
           51
           52
           53
           54
           55
           56
           57
           58
           59
           60
           61
           62
           63
           64
           65
           66
           67
           68
           69
           70
           71
           72
           73
           74
           75
           76
           77
           78
           79
           80
           81
           82
           83
           84
           85
           86
           87
           88
           89
           90
           91
           92
           93
           94
           95
           96
           97
           98
           99
          100
          101
          102
          103
          104
          105
          106
          107
          108
          109
          110
          111
          112
          113
          114
          115
          116
          117
          118
          119
          120
          121
          122
          123
          124
          125
          126
          127
          128
          129
          130
          131
          132
          133
          134
          135
          136
          137
          138
          139
          140
          141
          142
          143
          144
          145
          146
          147
          148
          149
          150
          151
          152
          153
          154
          155
          156
          157
          158
          159
          160
          161
          162
          163
          164
          165
          166
          167
          168
          169
          170
          171
          172
          173
          174
          175
          176
          177
          178
          179
          180
          181
          182
          183
          184
          185
          186
          187
          188
          189
          190
          191
          192
          193
          194
          195
          196
          197
          198
          199
          200
          201
          202
          203
          204
          205
          206
          207
          208
          209
          210
          211
          212
          213
          214
          215
          216
          217
          218
          219
          220
          221
          222
          223
          224
          225
          226
          227
          228
          229
          230
          231
          232
          233
          234
          235
          236
          237
          238
          239
          240
          241
          242
          243
          244
          245
          246
          247
          248
          249
          250
          251
          252
          253
          254
          255
          256
          257
          258
          259
          260
          261
          262
          263
          264
          265
          266
          267
          268
          269
          270
          271
          272
          273
          274
          275
          276
          277
          278
          279
          280
          281
          282
          283
          284
          285
          286
          287
          288
          289
          290
          291
          292
          293
          294
          295
          296
          297
          298
          299
          300
          301
          302
          303
          304
          305
          306
          307
          308
          309
          310
          311
          312
          313
          314
          315
          316
          317
          318
          319
          320
          321
          322
          323
          324
          325
          326
          327
          328
          329
          330
          331
          332
          333
          334
          335
          336
          337
          338
          339
          340
          341
          342
          343
          344
          345
          346
          347
          348
          349
          350
          351
          352
          353
          354
          355
          356
          357
          358
          359
          360
          361
          362
          363
          364
          365
          366
          367
          368
          369
          370
          371
          372
          373
          374
          375
          376
          377
          378
          379
          380
          381
          382
          383
          384
          385
          386
          387
          388
          389
          390
          391
          392
          393
          394
          395
          396
          397
          398
          399
          400
          401
          402
          403
          class MessageEmitter[TDeps, TResult](TaskManagerMixin, ABC):
              """Base class for all message processing nodes."""
          
              outbox = Signal(object)  # ChatMessage
              """Signal emitted when node produces a message."""
          
              message_received = Signal(ChatMessage)
              """Signal emitted when node receives a message."""
          
              message_sent = Signal(ChatMessage)
              """Signal emitted when node creates a message."""
          
              tool_used = Signal(ToolCallInfo)
              """Signal emitted when node uses a tool."""
          
              def __init__(
                  self,
                  name: str | None = None,
                  description: str | None = None,
                  context: NodeContext | None = None,
                  mcp_servers: Sequence[str | MCPServerConfig] | None = None,
                  enable_logging: bool = True,
              ):
                  """Initialize message node."""
                  super().__init__()
                  from llmling_agent.messaging.connection_manager import ConnectionManager
                  from llmling_agent.messaging.event_manager import EventManager
                  from llmling_agent.messaging.node_logger import NodeLogger
          
                  self._name = name or self.__class__.__name__
                  self.description = description
                  self.connections = ConnectionManager(self)
                  self._events = EventManager(self, enable_events=True)
                  servers = mcp_servers or []
                  self.mcp = MCPManager(
                      name=f"node_{self._name}",
                      servers=servers,
                      context=context,
                      owner=self.name,
                  )
                  self._logger = NodeLogger(self, enable_db_logging=enable_logging)
          
              async def __aenter__(self) -> Self:
                  """Initialize base message node."""
                  try:
                      await self._events.__aenter__()
                      await self.mcp.__aenter__()
                  except Exception as e:
                      await self.__aexit__(type(e), e, e.__traceback__)
                      msg = f"Failed to initialize {self.name}"
                      raise RuntimeError(msg) from e
                  else:
                      return self
          
              async def __aexit__(
                  self,
                  exc_type: type[BaseException] | None,
                  exc_val: BaseException | None,
                  exc_tb: TracebackType | None,
              ):
                  """Clean up base resources."""
                  await self._events.cleanup()
                  await self.mcp.__aexit__(exc_type, exc_val, exc_tb)
                  await self.cleanup_tasks()
          
              @property
              @abstractmethod
              def stats(self) -> MessageStats | AggregatedMessageStats:
                  """Get stats for this node."""
                  raise NotImplementedError
          
              @property
              def connection_stats(self) -> AggregatedTalkStats:
                  """Get stats for all active connections of this node."""
                  return AggregatedTalkStats(
                      stats=[talk.stats for talk in self.connections.get_connections()]
                  )
          
              @property
              def context(self) -> NodeContext:
                  """Get node context."""
                  raise NotImplementedError
          
              @property
              def name(self) -> str:
                  """Get agent name."""
                  return self._name or "llmling-agent"
          
              @name.setter
              def name(self, value: str):
                  self._name = value
          
              @overload
              def __rshift__(
                  self, other: MessageNode[Any, Any] | ProcessorCallback[Any]
              ) -> Talk[TResult]: ...
          
              @overload
              def __rshift__(
                  self, other: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]]
              ) -> TeamTalk[TResult]: ...
          
              def __rshift__(
                  self,
                  other: MessageNode[Any, Any]
                  | ProcessorCallback[Any]
                  | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
              ) -> Talk[Any] | TeamTalk[Any]:
                  """Connect agent to another agent or group.
          
                  Example:
                      agent >> other_agent  # Connect to single agent
                      agent >> (agent2 & agent3)  # Connect to group
                      agent >> "other_agent"  # Connect by name (needs pool)
                  """
                  return self.connect_to(other)
          
              @overload
              def connect_to(
                  self,
                  target: MessageNode[Any, Any] | ProcessorCallback[Any],
                  *,
                  queued: Literal[True],
                  queue_strategy: Literal["concat"],
              ) -> Talk[str]: ...
          
              @overload
              def connect_to(
                  self,
                  target: MessageNode[Any, Any] | ProcessorCallback[Any],
                  *,
                  connection_type: ConnectionType = "run",
                  name: str | None = None,
                  priority: int = 0,
                  delay: timedelta | None = None,
                  queued: bool = False,
                  queue_strategy: QueueStrategy = "latest",
                  transform: AnyTransformFn | None = None,
                  filter_condition: AsyncFilterFn | None = None,
                  stop_condition: AsyncFilterFn | None = None,
                  exit_condition: AsyncFilterFn | None = None,
              ) -> Talk[TResult]: ...
          
              @overload
              def connect_to(
                  self,
                  target: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
                  *,
                  queued: Literal[True],
                  queue_strategy: Literal["concat"],
              ) -> TeamTalk[str]: ...
          
              @overload
              def connect_to(
                  self,
                  target: Sequence[MessageNode[Any, TResult] | ProcessorCallback[TResult]],
                  *,
                  connection_type: ConnectionType = "run",
                  name: str | None = None,
                  priority: int = 0,
                  delay: timedelta | None = None,
                  queued: bool = False,
                  queue_strategy: QueueStrategy = "latest",
                  transform: AnyTransformFn | None = None,
                  filter_condition: AsyncFilterFn | None = None,
                  stop_condition: AsyncFilterFn | None = None,
                  exit_condition: AsyncFilterFn | None = None,
              ) -> TeamTalk[TResult]: ...
          
              @overload
              def connect_to(
                  self,
                  target: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
                  *,
                  connection_type: ConnectionType = "run",
                  name: str | None = None,
                  priority: int = 0,
                  delay: timedelta | None = None,
                  queued: bool = False,
                  queue_strategy: QueueStrategy = "latest",
                  transform: AnyTransformFn | None = None,
                  filter_condition: AsyncFilterFn | None = None,
                  stop_condition: AsyncFilterFn | None = None,
                  exit_condition: AsyncFilterFn | None = None,
              ) -> TeamTalk: ...
          
              def connect_to(
                  self,
                  target: MessageNode[Any, Any]
                  | ProcessorCallback[Any]
                  | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
                  *,
                  connection_type: ConnectionType = "run",
                  name: str | None = None,
                  priority: int = 0,
                  delay: timedelta | None = None,
                  queued: bool = False,
                  queue_strategy: QueueStrategy = "latest",
                  transform: AnyTransformFn | None = None,
                  filter_condition: AsyncFilterFn | None = None,
                  stop_condition: AsyncFilterFn | None = None,
                  exit_condition: AsyncFilterFn | None = None,
              ) -> Talk[Any] | TeamTalk:
                  """Create connection(s) to target(s)."""
                  # Handle callable case
                  from llmling_agent.agent import Agent, StructuredAgent
                  from llmling_agent.delegation.base_team import BaseTeam
                  from llmling_agent.messaging.messagenode import MessageNode
          
                  if callable(target):
                      if has_return_type(target, str):
                          target = Agent.from_callback(target)
                      else:
                          target = StructuredAgent.from_callback(target)
                      if pool := self.context.pool:
                          pool.register(target.name, target)
                  # we are explicit here just to make disctinction clear, we only want sequences
                  # of message units
                  if isinstance(target, Sequence) and not isinstance(target, BaseTeam):
                      targets: list[MessageNode] = []
                      for t in target:
                          match t:
                              case _ if callable(t):
                                  if has_return_type(t, str):
                                      other: MessageNode = Agent.from_callback(t)
                                  else:
                                      other = StructuredAgent.from_callback(t)
                                  if pool := self.context.pool:
                                      pool.register(other.name, other)
                                  targets.append(other)
                              case MessageNode():
                                  targets.append(t)
                              case _:
                                  msg = f"Invalid node type: {type(t)}"
                                  raise TypeError(msg)
                  else:
                      targets = target  # type: ignore
                  return self.connections.create_connection(
                      self,
                      targets,
                      connection_type=connection_type,
                      priority=priority,
                      name=name,
                      delay=delay,
                      queued=queued,
                      queue_strategy=queue_strategy,
                      transform=transform,
                      filter_condition=filter_condition,
                      stop_condition=stop_condition,
                      exit_condition=exit_condition,
                  )
          
              async def disconnect_all(self):
                  """Disconnect from all nodes."""
                  for target in list(self.connections.get_targets()):
                      self.stop_passing_results_to(target)
          
              def stop_passing_results_to(self, other: MessageNode):
                  """Stop forwarding results to another node."""
                  self.connections.disconnect(other)
          
              async def pre_run(
                  self,
                  *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
              ) -> tuple[ChatMessage[Any], list[Content | str]]:
                  """Hook to prepare a MessgeNode run call.
          
                  Args:
                      *prompt: The prompt(s) to prepare.
          
                  Returns:
                      A tuple of:
                          - Either incoming message, or a constructed incoming message based
                            on the prompt(s).
                          - A list of prompts to be sent to the model.
                  """
                  if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                      user_msg = prompt[0]
                      prompts = await convert_prompts([user_msg.content])
                      # Update received message's chain to show it came through its source
                      user_msg = user_msg.forwarded(prompt[0])
                      user_msg = replace(
                          user_msg,
                          role="user",  # change role since "perspective" changes
                          cost_info=None,  # Clear cost info to avoid double-counting
                      )
                      final_prompt = "\n\n".join(str(p) for p in prompts)
                  else:
                      prompts = await convert_prompts(prompt)
                      final_prompt = "\n\n".join(str(p) for p in prompts)
                      # use format_prompts?
                      user_msg = ChatMessage[str](
                          content=final_prompt,
                          role="user",
                          conversation_id=str(uuid4()),
                      )
                  self.message_received.emit(user_msg)
                  self.context.current_prompt = final_prompt
                  return user_msg, prompts
          
              # async def post_run(
              #     self,
              #     message: ChatMessage[TResult],
              #     previous_message: ChatMessage[Any] | None,
              #     wait_for_connections: bool | None = None,
              # ) -> ChatMessage[Any]:
              #     # For chain processing, update the response's chain
              #     if previous_message:
              #         message = message.forwarded(previous_message)
              #         conversation_id = previous_message.conversation_id
              #     else:
              #         conversation_id = str(uuid4())
              #     # Set conversation_id on response message
              #     message = replace(message, conversation_id=conversation_id)
              #     self.message_sent.emit(message)
              #     await self.connections.route_message(message, wait=wait_for_connections)
              #     return message
          
              async def run(
                  self,
                  *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
                  wait_for_connections: bool | None = None,
                  store_history: bool = True,
                  **kwargs: Any,
              ) -> ChatMessage[TResult]:
                  """Execute node with prompts and handle message routing.
          
                  Args:
                      prompt: Input prompts
                      wait_for_connections: Whether to wait for forwarded messages
                      store_history: Whether to store in conversation history
                      **kwargs: Additional arguments for _run
                  """
                  from llmling_agent import Agent, StructuredAgent
          
                  user_msg, prompts = await self.pre_run(*prompt)
                  message = await self._run(
                      *prompts,
                      store_history=store_history,
                      conversation_id=user_msg.conversation_id,
                      **kwargs,
                  )
          
                  # For chain processing, update the response's chain
                  if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                      message = message.forwarded(prompt[0])
          
                  if store_history and isinstance(self, Agent | StructuredAgent):
                      self.conversation.add_chat_messages([user_msg, message])
                  self.message_sent.emit(message)
                  await self.connections.route_message(message, wait=wait_for_connections)
                  return message
          
              @abstractmethod
              def _run(
                  self,
                  *prompts: Any,
                  **kwargs: Any,
              ) -> Coroutine[None, None, ChatMessage[TResult]]:
                  """Implementation-specific run logic."""
          

          connection_stats property

          connection_stats: AggregatedTalkStats
          

          Get stats for all active connections of this node.

          context property

          context: NodeContext
          

          Get node context.

          message_received class-attribute instance-attribute

          message_received = Signal(ChatMessage)
          

          Signal emitted when node receives a message.

          message_sent class-attribute instance-attribute

          message_sent = Signal(ChatMessage)
          

          Signal emitted when node creates a message.

          name property writable

          name: str
          

          Get agent name.

          outbox class-attribute instance-attribute

          outbox = Signal(object)
          

          Signal emitted when node produces a message.

          stats abstractmethod property

          Get stats for this node.

          tool_used class-attribute instance-attribute

          tool_used = Signal(ToolCallInfo)
          

          Signal emitted when node uses a tool.

          __aenter__ async

          __aenter__() -> Self
          

          Initialize base message node.

          Source code in src/llmling_agent/messaging/messageemitter.py
          86
          87
          88
          89
          90
          91
          92
          93
          94
          95
          96
          async def __aenter__(self) -> Self:
              """Initialize base message node."""
              try:
                  await self._events.__aenter__()
                  await self.mcp.__aenter__()
              except Exception as e:
                  await self.__aexit__(type(e), e, e.__traceback__)
                  msg = f"Failed to initialize {self.name}"
                  raise RuntimeError(msg) from e
              else:
                  return self
          

          __aexit__ async

          __aexit__(
              exc_type: type[BaseException] | None,
              exc_val: BaseException | None,
              exc_tb: TracebackType | None,
          )
          

          Clean up base resources.

          Source code in src/llmling_agent/messaging/messageemitter.py
           98
           99
          100
          101
          102
          103
          104
          105
          106
          107
          async def __aexit__(
              self,
              exc_type: type[BaseException] | None,
              exc_val: BaseException | None,
              exc_tb: TracebackType | None,
          ):
              """Clean up base resources."""
              await self._events.cleanup()
              await self.mcp.__aexit__(exc_type, exc_val, exc_tb)
              await self.cleanup_tasks()
          

          __init__

          __init__(
              name: str | None = None,
              description: str | None = None,
              context: NodeContext | None = None,
              mcp_servers: Sequence[str | MCPServerConfig] | None = None,
              enable_logging: bool = True,
          )
          

          Initialize message node.

          Source code in src/llmling_agent/messaging/messageemitter.py
          59
          60
          61
          62
          63
          64
          65
          66
          67
          68
          69
          70
          71
          72
          73
          74
          75
          76
          77
          78
          79
          80
          81
          82
          83
          84
          def __init__(
              self,
              name: str | None = None,
              description: str | None = None,
              context: NodeContext | None = None,
              mcp_servers: Sequence[str | MCPServerConfig] | None = None,
              enable_logging: bool = True,
          ):
              """Initialize message node."""
              super().__init__()
              from llmling_agent.messaging.connection_manager import ConnectionManager
              from llmling_agent.messaging.event_manager import EventManager
              from llmling_agent.messaging.node_logger import NodeLogger
          
              self._name = name or self.__class__.__name__
              self.description = description
              self.connections = ConnectionManager(self)
              self._events = EventManager(self, enable_events=True)
              servers = mcp_servers or []
              self.mcp = MCPManager(
                  name=f"node_{self._name}",
                  servers=servers,
                  context=context,
                  owner=self.name,
              )
              self._logger = NodeLogger(self, enable_db_logging=enable_logging)
          

          __rshift__

          __rshift__(other: MessageNode[Any, Any] | ProcessorCallback[Any]) -> Talk[TResult]
          
          __rshift__(
              other: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
          ) -> TeamTalk[TResult]
          
          __rshift__(
              other: MessageNode[Any, Any]
              | ProcessorCallback[Any]
              | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
          ) -> Talk[Any] | TeamTalk[Any]
          

          Connect agent to another agent or group.

          Example

          agent >> other_agent # Connect to single agent agent >> (agent2 & agent3) # Connect to group agent >> "other_agent" # Connect by name (needs pool)

          Source code in src/llmling_agent/messaging/messageemitter.py
          146
          147
          148
          149
          150
          151
          152
          153
          154
          155
          156
          157
          158
          159
          def __rshift__(
              self,
              other: MessageNode[Any, Any]
              | ProcessorCallback[Any]
              | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
          ) -> Talk[Any] | TeamTalk[Any]:
              """Connect agent to another agent or group.
          
              Example:
                  agent >> other_agent  # Connect to single agent
                  agent >> (agent2 & agent3)  # Connect to group
                  agent >> "other_agent"  # Connect by name (needs pool)
              """
              return self.connect_to(other)
          

          _run abstractmethod

          _run(*prompts: Any, **kwargs: Any) -> Coroutine[None, None, ChatMessage[TResult]]
          

          Implementation-specific run logic.

          Source code in src/llmling_agent/messaging/messageemitter.py
          397
          398
          399
          400
          401
          402
          403
          @abstractmethod
          def _run(
              self,
              *prompts: Any,
              **kwargs: Any,
          ) -> Coroutine[None, None, ChatMessage[TResult]]:
              """Implementation-specific run logic."""
          

          connect_to

          connect_to(
              target: MessageNode[Any, Any] | ProcessorCallback[Any],
              *,
              queued: Literal[True],
              queue_strategy: Literal["concat"],
          ) -> Talk[str]
          
          connect_to(
              target: MessageNode[Any, Any] | ProcessorCallback[Any],
              *,
              connection_type: ConnectionType = "run",
              name: str | None = None,
              priority: int = 0,
              delay: timedelta | None = None,
              queued: bool = False,
              queue_strategy: QueueStrategy = "latest",
              transform: AnyTransformFn | None = None,
              filter_condition: AsyncFilterFn | None = None,
              stop_condition: AsyncFilterFn | None = None,
              exit_condition: AsyncFilterFn | None = None,
          ) -> Talk[TResult]
          
          connect_to(
              target: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
              *,
              queued: Literal[True],
              queue_strategy: Literal["concat"],
          ) -> TeamTalk[str]
          
          connect_to(
              target: Sequence[MessageNode[Any, TResult] | ProcessorCallback[TResult]],
              *,
              connection_type: ConnectionType = "run",
              name: str | None = None,
              priority: int = 0,
              delay: timedelta | None = None,
              queued: bool = False,
              queue_strategy: QueueStrategy = "latest",
              transform: AnyTransformFn | None = None,
              filter_condition: AsyncFilterFn | None = None,
              stop_condition: AsyncFilterFn | None = None,
              exit_condition: AsyncFilterFn | None = None,
          ) -> TeamTalk[TResult]
          
          connect_to(
              target: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
              *,
              connection_type: ConnectionType = "run",
              name: str | None = None,
              priority: int = 0,
              delay: timedelta | None = None,
              queued: bool = False,
              queue_strategy: QueueStrategy = "latest",
              transform: AnyTransformFn | None = None,
              filter_condition: AsyncFilterFn | None = None,
              stop_condition: AsyncFilterFn | None = None,
              exit_condition: AsyncFilterFn | None = None,
          ) -> TeamTalk
          
          connect_to(
              target: MessageNode[Any, Any]
              | ProcessorCallback[Any]
              | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
              *,
              connection_type: ConnectionType = "run",
              name: str | None = None,
              priority: int = 0,
              delay: timedelta | None = None,
              queued: bool = False,
              queue_strategy: QueueStrategy = "latest",
              transform: AnyTransformFn | None = None,
              filter_condition: AsyncFilterFn | None = None,
              stop_condition: AsyncFilterFn | None = None,
              exit_condition: AsyncFilterFn | None = None,
          ) -> Talk[Any] | TeamTalk
          

          Create connection(s) to target(s).

          Source code in src/llmling_agent/messaging/messageemitter.py
          230
          231
          232
          233
          234
          235
          236
          237
          238
          239
          240
          241
          242
          243
          244
          245
          246
          247
          248
          249
          250
          251
          252
          253
          254
          255
          256
          257
          258
          259
          260
          261
          262
          263
          264
          265
          266
          267
          268
          269
          270
          271
          272
          273
          274
          275
          276
          277
          278
          279
          280
          281
          282
          283
          284
          285
          286
          287
          288
          289
          290
          291
          292
          293
          294
          def connect_to(
              self,
              target: MessageNode[Any, Any]
              | ProcessorCallback[Any]
              | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
              *,
              connection_type: ConnectionType = "run",
              name: str | None = None,
              priority: int = 0,
              delay: timedelta | None = None,
              queued: bool = False,
              queue_strategy: QueueStrategy = "latest",
              transform: AnyTransformFn | None = None,
              filter_condition: AsyncFilterFn | None = None,
              stop_condition: AsyncFilterFn | None = None,
              exit_condition: AsyncFilterFn | None = None,
          ) -> Talk[Any] | TeamTalk:
              """Create connection(s) to target(s)."""
              # Handle callable case
              from llmling_agent.agent import Agent, StructuredAgent
              from llmling_agent.delegation.base_team import BaseTeam
              from llmling_agent.messaging.messagenode import MessageNode
          
              if callable(target):
                  if has_return_type(target, str):
                      target = Agent.from_callback(target)
                  else:
                      target = StructuredAgent.from_callback(target)
                  if pool := self.context.pool:
                      pool.register(target.name, target)
              # we are explicit here just to make disctinction clear, we only want sequences
              # of message units
              if isinstance(target, Sequence) and not isinstance(target, BaseTeam):
                  targets: list[MessageNode] = []
                  for t in target:
                      match t:
                          case _ if callable(t):
                              if has_return_type(t, str):
                                  other: MessageNode = Agent.from_callback(t)
                              else:
                                  other = StructuredAgent.from_callback(t)
                              if pool := self.context.pool:
                                  pool.register(other.name, other)
                              targets.append(other)
                          case MessageNode():
                              targets.append(t)
                          case _:
                              msg = f"Invalid node type: {type(t)}"
                              raise TypeError(msg)
              else:
                  targets = target  # type: ignore
              return self.connections.create_connection(
                  self,
                  targets,
                  connection_type=connection_type,
                  priority=priority,
                  name=name,
                  delay=delay,
                  queued=queued,
                  queue_strategy=queue_strategy,
                  transform=transform,
                  filter_condition=filter_condition,
                  stop_condition=stop_condition,
                  exit_condition=exit_condition,
              )
          

          disconnect_all async

          disconnect_all()
          

          Disconnect from all nodes.

          Source code in src/llmling_agent/messaging/messageemitter.py
          296
          297
          298
          299
          async def disconnect_all(self):
              """Disconnect from all nodes."""
              for target in list(self.connections.get_targets()):
                  self.stop_passing_results_to(target)
          

          pre_run async

          pre_run(
              *prompt: AnyPromptType | Image | PathLike[str] | ChatMessage,
          ) -> tuple[ChatMessage[Any], list[Content | str]]
          

          Hook to prepare a MessgeNode run call.

          Parameters:

          Name Type Description Default
          *prompt AnyPromptType | Image | PathLike[str] | ChatMessage

          The prompt(s) to prepare.

          ()

          Returns:

          Type Description
          tuple[ChatMessage[Any], list[Content | str]]

          A tuple of: - Either incoming message, or a constructed incoming message based on the prompt(s). - A list of prompts to be sent to the model.

          Source code in src/llmling_agent/messaging/messageemitter.py
          305
          306
          307
          308
          309
          310
          311
          312
          313
          314
          315
          316
          317
          318
          319
          320
          321
          322
          323
          324
          325
          326
          327
          328
          329
          330
          331
          332
          333
          334
          335
          336
          337
          338
          339
          340
          341
          342
          async def pre_run(
              self,
              *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
          ) -> tuple[ChatMessage[Any], list[Content | str]]:
              """Hook to prepare a MessgeNode run call.
          
              Args:
                  *prompt: The prompt(s) to prepare.
          
              Returns:
                  A tuple of:
                      - Either incoming message, or a constructed incoming message based
                        on the prompt(s).
                      - A list of prompts to be sent to the model.
              """
              if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                  user_msg = prompt[0]
                  prompts = await convert_prompts([user_msg.content])
                  # Update received message's chain to show it came through its source
                  user_msg = user_msg.forwarded(prompt[0])
                  user_msg = replace(
                      user_msg,
                      role="user",  # change role since "perspective" changes
                      cost_info=None,  # Clear cost info to avoid double-counting
                  )
                  final_prompt = "\n\n".join(str(p) for p in prompts)
              else:
                  prompts = await convert_prompts(prompt)
                  final_prompt = "\n\n".join(str(p) for p in prompts)
                  # use format_prompts?
                  user_msg = ChatMessage[str](
                      content=final_prompt,
                      role="user",
                      conversation_id=str(uuid4()),
                  )
              self.message_received.emit(user_msg)
              self.context.current_prompt = final_prompt
              return user_msg, prompts
          

          run async

          run(
              *prompt: AnyPromptType | Image | PathLike[str] | ChatMessage,
              wait_for_connections: bool | None = None,
              store_history: bool = True,
              **kwargs: Any,
          ) -> ChatMessage[TResult]
          

          Execute node with prompts and handle message routing.

          Parameters:

          Name Type Description Default
          prompt AnyPromptType | Image | PathLike[str] | ChatMessage

          Input prompts

          ()
          wait_for_connections bool | None

          Whether to wait for forwarded messages

          None
          store_history bool

          Whether to store in conversation history

          True
          **kwargs Any

          Additional arguments for _run

          {}
          Source code in src/llmling_agent/messaging/messageemitter.py
          362
          363
          364
          365
          366
          367
          368
          369
          370
          371
          372
          373
          374
          375
          376
          377
          378
          379
          380
          381
          382
          383
          384
          385
          386
          387
          388
          389
          390
          391
          392
          393
          394
          395
          async def run(
              self,
              *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
              wait_for_connections: bool | None = None,
              store_history: bool = True,
              **kwargs: Any,
          ) -> ChatMessage[TResult]:
              """Execute node with prompts and handle message routing.
          
              Args:
                  prompt: Input prompts
                  wait_for_connections: Whether to wait for forwarded messages
                  store_history: Whether to store in conversation history
                  **kwargs: Additional arguments for _run
              """
              from llmling_agent import Agent, StructuredAgent
          
              user_msg, prompts = await self.pre_run(*prompt)
              message = await self._run(
                  *prompts,
                  store_history=store_history,
                  conversation_id=user_msg.conversation_id,
                  **kwargs,
              )
          
              # For chain processing, update the response's chain
              if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                  message = message.forwarded(prompt[0])
          
              if store_history and isinstance(self, Agent | StructuredAgent):
                  self.conversation.add_chat_messages([user_msg, message])
              self.message_sent.emit(message)
              await self.connections.route_message(message, wait=wait_for_connections)
              return message
          

          stop_passing_results_to

          stop_passing_results_to(other: MessageNode)
          

          Stop forwarding results to another node.

          Source code in src/llmling_agent/messaging/messageemitter.py
          301
          302
          303
          def stop_passing_results_to(self, other: MessageNode):
              """Stop forwarding results to another node."""
              self.connections.disconnect(other)