Skip to content

messageemitter

Class info

Classes

Name Children Inherits
AggregatedMessageStats
llmling_agent.talk.stats
Statistics aggregated from multiple connections.
AggregatedTalkStats
llmling_agent.talk.stats
Statistics aggregated from multiple connections.
    ChatMessage
    llmling_agent.messaging.messages
    Common message format for all UI types.
      MessageEmitter
      llmling_agent.messaging.messageemitter
      Base class for all message processing nodes.
      MessageStats
      llmling_agent.talk.stats
      Statistics for a single connection.
      TaskManagerMixin
      llmling_agent.utils.tasks
      Mixin for managing async tasks.
      ToolCallInfo
      llmling_agent.tools.tool_call_info
      Information about an executed tool call.

        🛈 DocStrings

        Base class for message processing nodes.

        MessageEmitter

        Bases: TaskManagerMixin, ABC

        Base class for all message processing nodes.

        Source code in src/llmling_agent/messaging/messageemitter.py
         43
         44
         45
         46
         47
         48
         49
         50
         51
         52
         53
         54
         55
         56
         57
         58
         59
         60
         61
         62
         63
         64
         65
         66
         67
         68
         69
         70
         71
         72
         73
         74
         75
         76
         77
         78
         79
         80
         81
         82
         83
         84
         85
         86
         87
         88
         89
         90
         91
         92
         93
         94
         95
         96
         97
         98
         99
        100
        101
        102
        103
        104
        105
        106
        107
        108
        109
        110
        111
        112
        113
        114
        115
        116
        117
        118
        119
        120
        121
        122
        123
        124
        125
        126
        127
        128
        129
        130
        131
        132
        133
        134
        135
        136
        137
        138
        139
        140
        141
        142
        143
        144
        145
        146
        147
        148
        149
        150
        151
        152
        153
        154
        155
        156
        157
        158
        159
        160
        161
        162
        163
        164
        165
        166
        167
        168
        169
        170
        171
        172
        173
        174
        175
        176
        177
        178
        179
        180
        181
        182
        183
        184
        185
        186
        187
        188
        189
        190
        191
        192
        193
        194
        195
        196
        197
        198
        199
        200
        201
        202
        203
        204
        205
        206
        207
        208
        209
        210
        211
        212
        213
        214
        215
        216
        217
        218
        219
        220
        221
        222
        223
        224
        225
        226
        227
        228
        229
        230
        231
        232
        233
        234
        235
        236
        237
        238
        239
        240
        241
        242
        243
        244
        245
        246
        247
        248
        249
        250
        251
        252
        253
        254
        255
        256
        257
        258
        259
        260
        261
        262
        263
        264
        265
        266
        267
        268
        269
        270
        271
        272
        273
        274
        275
        276
        277
        278
        279
        280
        281
        282
        283
        284
        285
        286
        287
        288
        289
        290
        291
        292
        293
        294
        295
        296
        297
        298
        299
        300
        301
        302
        303
        304
        305
        306
        307
        308
        309
        310
        311
        312
        313
        314
        315
        316
        317
        318
        319
        320
        321
        322
        323
        324
        325
        326
        327
        328
        329
        330
        331
        332
        333
        334
        335
        336
        337
        338
        339
        340
        341
        342
        343
        344
        345
        346
        347
        348
        349
        350
        351
        352
        353
        354
        355
        356
        357
        358
        359
        360
        361
        362
        363
        364
        365
        366
        367
        368
        369
        370
        371
        372
        373
        374
        375
        376
        377
        378
        379
        380
        381
        382
        383
        384
        385
        386
        387
        388
        389
        390
        391
        392
        393
        394
        395
        class MessageEmitter[TDeps, TResult](TaskManagerMixin, ABC):
            """Base class for all message processing nodes."""
        
            outbox = Signal(object)  # ChatMessage
            """Signal emitted when node produces a message."""
        
            message_received = Signal(ChatMessage)
            """Signal emitted when node receives a message."""
        
            message_sent = Signal(ChatMessage)
            """Signal emitted when node creates a message."""
        
            tool_used = Signal(ToolCallInfo)
            """Signal emitted when node uses a tool."""
        
            def __init__(
                self,
                name: str | None = None,
                description: str | None = None,
                context: NodeContext | None = None,
                mcp_servers: Sequence[str | MCPServerConfig] | None = None,
                enable_logging: bool = True,
            ):
                """Initialize message node."""
                super().__init__()
                from llmling_agent.mcp_server.manager import MCPManager
                from llmling_agent.messaging.connection_manager import ConnectionManager
                from llmling_agent.messaging.event_manager import EventManager
                from llmling_agent.messaging.node_logger import NodeLogger
        
                self._name = name or self.__class__.__name__
                self.description = description
                self.connections = ConnectionManager(self)
                self._events = EventManager(self, enable_events=True)
                servers = mcp_servers or []
                name = f"node_{self._name}"
                self.mcp = MCPManager(name, servers=servers, context=context, owner=self.name)
                self._logger = NodeLogger(self, enable_db_logging=enable_logging)
        
            async def __aenter__(self) -> Self:
                """Initialize base message node."""
                try:
                    await self._events.__aenter__()
                    await self.mcp.__aenter__()
                except Exception as e:
                    await self.__aexit__(type(e), e, e.__traceback__)
                    msg = f"Failed to initialize {self.name}"
                    raise RuntimeError(msg) from e
                else:
                    return self
        
            async def __aexit__(
                self,
                exc_type: type[BaseException] | None,
                exc_val: BaseException | None,
                exc_tb: TracebackType | None,
            ):
                """Clean up base resources."""
                await self._events.cleanup()
                await self.mcp.__aexit__(exc_type, exc_val, exc_tb)
                await self.cleanup_tasks()
        
            @property
            @abstractmethod
            def stats(self) -> MessageStats | AggregatedMessageStats:
                """Get stats for this node."""
                raise NotImplementedError
        
            @property
            def connection_stats(self) -> AggregatedTalkStats:
                """Get stats for all active connections of this node."""
                stats = [talk.stats for talk in self.connections.get_connections()]
                return AggregatedTalkStats(stats=stats)
        
            @property
            def context(self) -> NodeContext:
                """Get node context."""
                raise NotImplementedError
        
            @property
            def name(self) -> str:
                """Get agent name."""
                return self._name or "llmling-agent"
        
            @name.setter
            def name(self, value: str):
                self._name = value
        
            @overload
            def __rshift__(
                self, other: MessageNode[Any, Any] | ProcessorCallback[Any]
            ) -> Talk[TResult]: ...
        
            @overload
            def __rshift__(
                self, other: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]]
            ) -> TeamTalk[TResult]: ...
        
            def __rshift__(
                self,
                other: MessageNode[Any, Any]
                | ProcessorCallback[Any]
                | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
            ) -> Talk[Any] | TeamTalk[Any]:
                """Connect agent to another agent or group.
        
                Example:
                    agent >> other_agent  # Connect to single agent
                    agent >> (agent2 & agent3)  # Connect to group
                    agent >> "other_agent"  # Connect by name (needs pool)
                """
                return self.connect_to(other)
        
            @overload
            def connect_to(
                self,
                target: MessageNode[Any, Any] | ProcessorCallback[Any],
                *,
                queued: Literal[True],
                queue_strategy: Literal["concat"],
            ) -> Talk[str]: ...
        
            @overload
            def connect_to(
                self,
                target: MessageNode[Any, Any] | ProcessorCallback[Any],
                *,
                connection_type: ConnectionType = "run",
                name: str | None = None,
                priority: int = 0,
                delay: timedelta | None = None,
                queued: bool = False,
                queue_strategy: QueueStrategy = "latest",
                transform: AnyTransformFn | None = None,
                filter_condition: AsyncFilterFn | None = None,
                stop_condition: AsyncFilterFn | None = None,
                exit_condition: AsyncFilterFn | None = None,
            ) -> Talk[TResult]: ...
        
            @overload
            def connect_to(
                self,
                target: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
                *,
                queued: Literal[True],
                queue_strategy: Literal["concat"],
            ) -> TeamTalk[str]: ...
        
            @overload
            def connect_to(
                self,
                target: Sequence[MessageNode[Any, TResult] | ProcessorCallback[TResult]],
                *,
                connection_type: ConnectionType = "run",
                name: str | None = None,
                priority: int = 0,
                delay: timedelta | None = None,
                queued: bool = False,
                queue_strategy: QueueStrategy = "latest",
                transform: AnyTransformFn | None = None,
                filter_condition: AsyncFilterFn | None = None,
                stop_condition: AsyncFilterFn | None = None,
                exit_condition: AsyncFilterFn | None = None,
            ) -> TeamTalk[TResult]: ...
        
            @overload
            def connect_to(
                self,
                target: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
                *,
                connection_type: ConnectionType = "run",
                name: str | None = None,
                priority: int = 0,
                delay: timedelta | None = None,
                queued: bool = False,
                queue_strategy: QueueStrategy = "latest",
                transform: AnyTransformFn | None = None,
                filter_condition: AsyncFilterFn | None = None,
                stop_condition: AsyncFilterFn | None = None,
                exit_condition: AsyncFilterFn | None = None,
            ) -> TeamTalk: ...
        
            def connect_to(
                self,
                target: MessageNode[Any, Any]
                | ProcessorCallback[Any]
                | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
                *,
                connection_type: ConnectionType = "run",
                name: str | None = None,
                priority: int = 0,
                delay: timedelta | None = None,
                queued: bool = False,
                queue_strategy: QueueStrategy = "latest",
                transform: AnyTransformFn | None = None,
                filter_condition: AsyncFilterFn | None = None,
                stop_condition: AsyncFilterFn | None = None,
                exit_condition: AsyncFilterFn | None = None,
            ) -> Talk[Any] | TeamTalk:
                """Create connection(s) to target(s)."""
                # Handle callable case
                from llmling_agent import MessageNode
                from llmling_agent.agent import Agent, StructuredAgent
                from llmling_agent.delegation.base_team import BaseTeam
        
                if callable(target):
                    if has_return_type(target, str):
                        target = Agent.from_callback(target)
                    else:
                        target = StructuredAgent.from_callback(target)
                    if pool := self.context.pool:
                        pool.register(target.name, target)
                # we are explicit here just to make disctinction clear, we only want sequences
                # of message units
                if isinstance(target, Sequence) and not isinstance(target, BaseTeam):
                    targets: list[MessageNode] = []
                    for t in target:
                        match t:
                            case _ if callable(t):
                                if has_return_type(t, str):
                                    other: MessageNode = Agent.from_callback(t)
                                else:
                                    other = StructuredAgent.from_callback(t)
                                if pool := self.context.pool:
                                    pool.register(other.name, other)
                                targets.append(other)
                            case MessageNode():
                                targets.append(t)
                            case _:
                                msg = f"Invalid node type: {type(t)}"
                                raise TypeError(msg)
                else:
                    targets = target  # type: ignore
                return self.connections.create_connection(
                    self,
                    targets,
                    connection_type=connection_type,
                    priority=priority,
                    name=name,
                    delay=delay,
                    queued=queued,
                    queue_strategy=queue_strategy,
                    transform=transform,
                    filter_condition=filter_condition,
                    stop_condition=stop_condition,
                    exit_condition=exit_condition,
                )
        
            async def disconnect_all(self):
                """Disconnect from all nodes."""
                for target in list(self.connections.get_targets()):
                    self.stop_passing_results_to(target)
        
            def stop_passing_results_to(self, other: MessageNode):
                """Stop forwarding results to another node."""
                self.connections.disconnect(other)
        
            async def pre_run(
                self,
                *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
            ) -> tuple[ChatMessage[Any], list[Content | str]]:
                """Hook to prepare a MessgeNode run call.
        
                Args:
                    *prompt: The prompt(s) to prepare.
        
                Returns:
                    A tuple of:
                        - Either incoming message, or a constructed incoming message based
                          on the prompt(s).
                        - A list of prompts to be sent to the model.
                """
                if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                    user_msg = prompt[0]
                    prompts = await convert_prompts([user_msg.content])
                    # Update received message's chain to show it came through its source
                    user_msg = user_msg.forwarded(prompt[0])
                    # change role since "perspective" changes, clear cost to avoid counting twice
                    user_msg = replace(user_msg, role="user", cost_info=None)
                    final_prompt = "\n\n".join(str(p) for p in prompts)
                else:
                    prompts = await convert_prompts(prompt)
                    final_prompt = "\n\n".join(str(p) for p in prompts)
                    # use format_prompts?
                    user_msg = ChatMessage[str](
                        content=final_prompt,
                        role="user",
                        conversation_id=str(uuid4()),
                    )
                self.message_received.emit(user_msg)
                self.context.current_prompt = final_prompt
                return user_msg, prompts
        
            # async def post_run(
            #     self,
            #     message: ChatMessage[TResult],
            #     previous_message: ChatMessage[Any] | None,
            #     wait_for_connections: bool | None = None,
            # ) -> ChatMessage[Any]:
            #     # For chain processing, update the response's chain
            #     if previous_message:
            #         message = message.forwarded(previous_message)
            #         conversation_id = previous_message.conversation_id
            #     else:
            #         conversation_id = str(uuid4())
            #     # Set conversation_id on response message
            #     message = replace(message, conversation_id=conversation_id)
            #     self.message_sent.emit(message)
            #     await self.connections.route_message(message, wait=wait_for_connections)
            #     return message
        
            async def run(
                self,
                *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
                wait_for_connections: bool | None = None,
                store_history: bool = True,
                **kwargs: Any,
            ) -> ChatMessage[TResult]:
                """Execute node with prompts and handle message routing.
        
                Args:
                    prompt: Input prompts
                    wait_for_connections: Whether to wait for forwarded messages
                    store_history: Whether to store in conversation history
                    **kwargs: Additional arguments for _run
                """
                from llmling_agent import Agent, StructuredAgent
        
                user_msg, prompts = await self.pre_run(*prompt)
                message = await self._run(
                    *prompts,
                    store_history=store_history,
                    conversation_id=user_msg.conversation_id,
                    **kwargs,
                )
        
                # For chain processing, update the response's chain
                if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                    message = message.forwarded(prompt[0])
        
                if store_history and isinstance(self, Agent | StructuredAgent):
                    self.conversation.add_chat_messages([user_msg, message])
                self.message_sent.emit(message)
                await self.connections.route_message(message, wait=wait_for_connections)
                return message
        
            @abstractmethod
            def _run(
                self,
                *prompts: Any,
                **kwargs: Any,
            ) -> Coroutine[None, None, ChatMessage[TResult]]:
                """Implementation-specific run logic."""
        

        connection_stats property

        connection_stats: AggregatedTalkStats
        

        Get stats for all active connections of this node.

        context property

        context: NodeContext
        

        Get node context.

        message_received class-attribute instance-attribute

        message_received = Signal(ChatMessage)
        

        Signal emitted when node receives a message.

        message_sent class-attribute instance-attribute

        message_sent = Signal(ChatMessage)
        

        Signal emitted when node creates a message.

        name property writable

        name: str
        

        Get agent name.

        outbox class-attribute instance-attribute

        outbox = Signal(object)
        

        Signal emitted when node produces a message.

        stats abstractmethod property

        Get stats for this node.

        tool_used class-attribute instance-attribute

        tool_used = Signal(ToolCallInfo)
        

        Signal emitted when node uses a tool.

        __aenter__ async

        __aenter__() -> Self
        

        Initialize base message node.

        Source code in src/llmling_agent/messaging/messageemitter.py
        82
        83
        84
        85
        86
        87
        88
        89
        90
        91
        92
        async def __aenter__(self) -> Self:
            """Initialize base message node."""
            try:
                await self._events.__aenter__()
                await self.mcp.__aenter__()
            except Exception as e:
                await self.__aexit__(type(e), e, e.__traceback__)
                msg = f"Failed to initialize {self.name}"
                raise RuntimeError(msg) from e
            else:
                return self
        

        __aexit__ async

        __aexit__(
            exc_type: type[BaseException] | None,
            exc_val: BaseException | None,
            exc_tb: TracebackType | None,
        )
        

        Clean up base resources.

        Source code in src/llmling_agent/messaging/messageemitter.py
         94
         95
         96
         97
         98
         99
        100
        101
        102
        103
        async def __aexit__(
            self,
            exc_type: type[BaseException] | None,
            exc_val: BaseException | None,
            exc_tb: TracebackType | None,
        ):
            """Clean up base resources."""
            await self._events.cleanup()
            await self.mcp.__aexit__(exc_type, exc_val, exc_tb)
            await self.cleanup_tasks()
        

        __init__

        __init__(
            name: str | None = None,
            description: str | None = None,
            context: NodeContext | None = None,
            mcp_servers: Sequence[str | MCPServerConfig] | None = None,
            enable_logging: bool = True,
        )
        

        Initialize message node.

        Source code in src/llmling_agent/messaging/messageemitter.py
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        74
        75
        76
        77
        78
        79
        80
        def __init__(
            self,
            name: str | None = None,
            description: str | None = None,
            context: NodeContext | None = None,
            mcp_servers: Sequence[str | MCPServerConfig] | None = None,
            enable_logging: bool = True,
        ):
            """Initialize message node."""
            super().__init__()
            from llmling_agent.mcp_server.manager import MCPManager
            from llmling_agent.messaging.connection_manager import ConnectionManager
            from llmling_agent.messaging.event_manager import EventManager
            from llmling_agent.messaging.node_logger import NodeLogger
        
            self._name = name or self.__class__.__name__
            self.description = description
            self.connections = ConnectionManager(self)
            self._events = EventManager(self, enable_events=True)
            servers = mcp_servers or []
            name = f"node_{self._name}"
            self.mcp = MCPManager(name, servers=servers, context=context, owner=self.name)
            self._logger = NodeLogger(self, enable_db_logging=enable_logging)
        

        __rshift__

        __rshift__(other: MessageNode[Any, Any] | ProcessorCallback[Any]) -> Talk[TResult]
        
        __rshift__(
            other: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
        ) -> TeamTalk[TResult]
        
        __rshift__(
            other: MessageNode[Any, Any]
            | ProcessorCallback[Any]
            | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
        ) -> Talk[Any] | TeamTalk[Any]
        

        Connect agent to another agent or group.

        Example

        agent >> other_agent # Connect to single agent agent >> (agent2 & agent3) # Connect to group agent >> "other_agent" # Connect by name (needs pool)

        Source code in src/llmling_agent/messaging/messageemitter.py
        141
        142
        143
        144
        145
        146
        147
        148
        149
        150
        151
        152
        153
        154
        def __rshift__(
            self,
            other: MessageNode[Any, Any]
            | ProcessorCallback[Any]
            | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
        ) -> Talk[Any] | TeamTalk[Any]:
            """Connect agent to another agent or group.
        
            Example:
                agent >> other_agent  # Connect to single agent
                agent >> (agent2 & agent3)  # Connect to group
                agent >> "other_agent"  # Connect by name (needs pool)
            """
            return self.connect_to(other)
        

        _run abstractmethod

        _run(*prompts: Any, **kwargs: Any) -> Coroutine[None, None, ChatMessage[TResult]]
        

        Implementation-specific run logic.

        Source code in src/llmling_agent/messaging/messageemitter.py
        389
        390
        391
        392
        393
        394
        395
        @abstractmethod
        def _run(
            self,
            *prompts: Any,
            **kwargs: Any,
        ) -> Coroutine[None, None, ChatMessage[TResult]]:
            """Implementation-specific run logic."""
        

        connect_to

        connect_to(
            target: MessageNode[Any, Any] | ProcessorCallback[Any],
            *,
            queued: Literal[True],
            queue_strategy: Literal["concat"],
        ) -> Talk[str]
        
        connect_to(
            target: MessageNode[Any, Any] | ProcessorCallback[Any],
            *,
            connection_type: ConnectionType = "run",
            name: str | None = None,
            priority: int = 0,
            delay: timedelta | None = None,
            queued: bool = False,
            queue_strategy: QueueStrategy = "latest",
            transform: AnyTransformFn | None = None,
            filter_condition: AsyncFilterFn | None = None,
            stop_condition: AsyncFilterFn | None = None,
            exit_condition: AsyncFilterFn | None = None,
        ) -> Talk[TResult]
        
        connect_to(
            target: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
            *,
            queued: Literal[True],
            queue_strategy: Literal["concat"],
        ) -> TeamTalk[str]
        
        connect_to(
            target: Sequence[MessageNode[Any, TResult] | ProcessorCallback[TResult]],
            *,
            connection_type: ConnectionType = "run",
            name: str | None = None,
            priority: int = 0,
            delay: timedelta | None = None,
            queued: bool = False,
            queue_strategy: QueueStrategy = "latest",
            transform: AnyTransformFn | None = None,
            filter_condition: AsyncFilterFn | None = None,
            stop_condition: AsyncFilterFn | None = None,
            exit_condition: AsyncFilterFn | None = None,
        ) -> TeamTalk[TResult]
        
        connect_to(
            target: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
            *,
            connection_type: ConnectionType = "run",
            name: str | None = None,
            priority: int = 0,
            delay: timedelta | None = None,
            queued: bool = False,
            queue_strategy: QueueStrategy = "latest",
            transform: AnyTransformFn | None = None,
            filter_condition: AsyncFilterFn | None = None,
            stop_condition: AsyncFilterFn | None = None,
            exit_condition: AsyncFilterFn | None = None,
        ) -> TeamTalk
        
        connect_to(
            target: MessageNode[Any, Any]
            | ProcessorCallback[Any]
            | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
            *,
            connection_type: ConnectionType = "run",
            name: str | None = None,
            priority: int = 0,
            delay: timedelta | None = None,
            queued: bool = False,
            queue_strategy: QueueStrategy = "latest",
            transform: AnyTransformFn | None = None,
            filter_condition: AsyncFilterFn | None = None,
            stop_condition: AsyncFilterFn | None = None,
            exit_condition: AsyncFilterFn | None = None,
        ) -> Talk[Any] | TeamTalk
        

        Create connection(s) to target(s).

        Source code in src/llmling_agent/messaging/messageemitter.py
        225
        226
        227
        228
        229
        230
        231
        232
        233
        234
        235
        236
        237
        238
        239
        240
        241
        242
        243
        244
        245
        246
        247
        248
        249
        250
        251
        252
        253
        254
        255
        256
        257
        258
        259
        260
        261
        262
        263
        264
        265
        266
        267
        268
        269
        270
        271
        272
        273
        274
        275
        276
        277
        278
        279
        280
        281
        282
        283
        284
        285
        286
        287
        288
        289
        def connect_to(
            self,
            target: MessageNode[Any, Any]
            | ProcessorCallback[Any]
            | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
            *,
            connection_type: ConnectionType = "run",
            name: str | None = None,
            priority: int = 0,
            delay: timedelta | None = None,
            queued: bool = False,
            queue_strategy: QueueStrategy = "latest",
            transform: AnyTransformFn | None = None,
            filter_condition: AsyncFilterFn | None = None,
            stop_condition: AsyncFilterFn | None = None,
            exit_condition: AsyncFilterFn | None = None,
        ) -> Talk[Any] | TeamTalk:
            """Create connection(s) to target(s)."""
            # Handle callable case
            from llmling_agent import MessageNode
            from llmling_agent.agent import Agent, StructuredAgent
            from llmling_agent.delegation.base_team import BaseTeam
        
            if callable(target):
                if has_return_type(target, str):
                    target = Agent.from_callback(target)
                else:
                    target = StructuredAgent.from_callback(target)
                if pool := self.context.pool:
                    pool.register(target.name, target)
            # we are explicit here just to make disctinction clear, we only want sequences
            # of message units
            if isinstance(target, Sequence) and not isinstance(target, BaseTeam):
                targets: list[MessageNode] = []
                for t in target:
                    match t:
                        case _ if callable(t):
                            if has_return_type(t, str):
                                other: MessageNode = Agent.from_callback(t)
                            else:
                                other = StructuredAgent.from_callback(t)
                            if pool := self.context.pool:
                                pool.register(other.name, other)
                            targets.append(other)
                        case MessageNode():
                            targets.append(t)
                        case _:
                            msg = f"Invalid node type: {type(t)}"
                            raise TypeError(msg)
            else:
                targets = target  # type: ignore
            return self.connections.create_connection(
                self,
                targets,
                connection_type=connection_type,
                priority=priority,
                name=name,
                delay=delay,
                queued=queued,
                queue_strategy=queue_strategy,
                transform=transform,
                filter_condition=filter_condition,
                stop_condition=stop_condition,
                exit_condition=exit_condition,
            )
        

        disconnect_all async

        disconnect_all()
        

        Disconnect from all nodes.

        Source code in src/llmling_agent/messaging/messageemitter.py
        291
        292
        293
        294
        async def disconnect_all(self):
            """Disconnect from all nodes."""
            for target in list(self.connections.get_targets()):
                self.stop_passing_results_to(target)
        

        pre_run async

        pre_run(
            *prompt: AnyPromptType | Image | PathLike[str] | ChatMessage,
        ) -> tuple[ChatMessage[Any], list[Content | str]]
        

        Hook to prepare a MessgeNode run call.

        Parameters:

        Name Type Description Default
        *prompt AnyPromptType | Image | PathLike[str] | ChatMessage

        The prompt(s) to prepare.

        ()

        Returns:

        Type Description
        tuple[ChatMessage[Any], list[Content | str]]

        A tuple of: - Either incoming message, or a constructed incoming message based on the prompt(s). - A list of prompts to be sent to the model.

        Source code in src/llmling_agent/messaging/messageemitter.py
        300
        301
        302
        303
        304
        305
        306
        307
        308
        309
        310
        311
        312
        313
        314
        315
        316
        317
        318
        319
        320
        321
        322
        323
        324
        325
        326
        327
        328
        329
        330
        331
        332
        333
        334
        async def pre_run(
            self,
            *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
        ) -> tuple[ChatMessage[Any], list[Content | str]]:
            """Hook to prepare a MessgeNode run call.
        
            Args:
                *prompt: The prompt(s) to prepare.
        
            Returns:
                A tuple of:
                    - Either incoming message, or a constructed incoming message based
                      on the prompt(s).
                    - A list of prompts to be sent to the model.
            """
            if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                user_msg = prompt[0]
                prompts = await convert_prompts([user_msg.content])
                # Update received message's chain to show it came through its source
                user_msg = user_msg.forwarded(prompt[0])
                # change role since "perspective" changes, clear cost to avoid counting twice
                user_msg = replace(user_msg, role="user", cost_info=None)
                final_prompt = "\n\n".join(str(p) for p in prompts)
            else:
                prompts = await convert_prompts(prompt)
                final_prompt = "\n\n".join(str(p) for p in prompts)
                # use format_prompts?
                user_msg = ChatMessage[str](
                    content=final_prompt,
                    role="user",
                    conversation_id=str(uuid4()),
                )
            self.message_received.emit(user_msg)
            self.context.current_prompt = final_prompt
            return user_msg, prompts
        

        run async

        run(
            *prompt: AnyPromptType | Image | PathLike[str] | ChatMessage,
            wait_for_connections: bool | None = None,
            store_history: bool = True,
            **kwargs: Any,
        ) -> ChatMessage[TResult]
        

        Execute node with prompts and handle message routing.

        Parameters:

        Name Type Description Default
        prompt AnyPromptType | Image | PathLike[str] | ChatMessage

        Input prompts

        ()
        wait_for_connections bool | None

        Whether to wait for forwarded messages

        None
        store_history bool

        Whether to store in conversation history

        True
        **kwargs Any

        Additional arguments for _run

        {}
        Source code in src/llmling_agent/messaging/messageemitter.py
        354
        355
        356
        357
        358
        359
        360
        361
        362
        363
        364
        365
        366
        367
        368
        369
        370
        371
        372
        373
        374
        375
        376
        377
        378
        379
        380
        381
        382
        383
        384
        385
        386
        387
        async def run(
            self,
            *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
            wait_for_connections: bool | None = None,
            store_history: bool = True,
            **kwargs: Any,
        ) -> ChatMessage[TResult]:
            """Execute node with prompts and handle message routing.
        
            Args:
                prompt: Input prompts
                wait_for_connections: Whether to wait for forwarded messages
                store_history: Whether to store in conversation history
                **kwargs: Additional arguments for _run
            """
            from llmling_agent import Agent, StructuredAgent
        
            user_msg, prompts = await self.pre_run(*prompt)
            message = await self._run(
                *prompts,
                store_history=store_history,
                conversation_id=user_msg.conversation_id,
                **kwargs,
            )
        
            # For chain processing, update the response's chain
            if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                message = message.forwarded(prompt[0])
        
            if store_history and isinstance(self, Agent | StructuredAgent):
                self.conversation.add_chat_messages([user_msg, message])
            self.message_sent.emit(message)
            await self.connections.route_message(message, wait=wait_for_connections)
            return message
        

        stop_passing_results_to

        stop_passing_results_to(other: MessageNode)
        

        Stop forwarding results to another node.

        Source code in src/llmling_agent/messaging/messageemitter.py
        296
        297
        298
        def stop_passing_results_to(self, other: MessageNode):
            """Stop forwarding results to another node."""
            self.connections.disconnect(other)