Skip to content

messagenode

Class info

Classes

Name Children Inherits
AggregatedTalkStats
llmling_agent.talk.stats
Statistics aggregated from multiple connections.
    ChatMessage
    llmling_agent.messaging.messages
    Common message format for all UI types.
      MessageNode
      llmling_agent.messaging.messagenode
      Base class for all message processing nodes.
      TaskManager
      llmling_agent.utils.tasks
      Mixin for managing async tasks.

        🛈 DocStrings

        Base class for message processing nodes.

        MessageNode

        Bases: ABC

        Base class for all message processing nodes.

        Source code in src/llmling_agent/messaging/messagenode.py
         41
         42
         43
         44
         45
         46
         47
         48
         49
         50
         51
         52
         53
         54
         55
         56
         57
         58
         59
         60
         61
         62
         63
         64
         65
         66
         67
         68
         69
         70
         71
         72
         73
         74
         75
         76
         77
         78
         79
         80
         81
         82
         83
         84
         85
         86
         87
         88
         89
         90
         91
         92
         93
         94
         95
         96
         97
         98
         99
        100
        101
        102
        103
        104
        105
        106
        107
        108
        109
        110
        111
        112
        113
        114
        115
        116
        117
        118
        119
        120
        121
        122
        123
        124
        125
        126
        127
        128
        129
        130
        131
        132
        133
        134
        135
        136
        137
        138
        139
        140
        141
        142
        143
        144
        145
        146
        147
        148
        149
        150
        151
        152
        153
        154
        155
        156
        157
        158
        159
        160
        161
        162
        163
        164
        165
        166
        167
        168
        169
        170
        171
        172
        173
        174
        175
        176
        177
        178
        179
        180
        181
        182
        183
        184
        185
        186
        187
        188
        189
        190
        191
        192
        193
        194
        195
        196
        197
        198
        199
        200
        201
        202
        203
        204
        205
        206
        207
        208
        209
        210
        211
        212
        213
        214
        215
        216
        217
        218
        219
        220
        221
        222
        223
        224
        225
        226
        227
        228
        229
        230
        231
        232
        233
        234
        235
        236
        237
        238
        239
        240
        241
        242
        243
        244
        245
        246
        247
        248
        249
        250
        251
        252
        253
        254
        255
        256
        257
        258
        259
        260
        261
        262
        263
        264
        265
        266
        267
        268
        269
        270
        271
        272
        273
        274
        275
        276
        277
        278
        279
        280
        281
        282
        283
        284
        285
        286
        287
        288
        289
        290
        291
        292
        293
        294
        295
        296
        297
        298
        299
        300
        301
        302
        303
        304
        305
        306
        307
        308
        309
        310
        311
        312
        313
        314
        315
        316
        317
        class MessageNode[TDeps, TResult](ABC):
            """Base class for all message processing nodes."""
        
            message_received = Signal(ChatMessage)
            """Signal emitted when node receives a message."""
        
            message_sent = Signal(ChatMessage)
            """Signal emitted when node creates a message."""
        
            def __init__(
                self,
                name: str | None = None,
                description: str | None = None,
                input_provider: InputProvider | None = None,
                mcp_servers: Sequence[str | MCPServerConfig] | None = None,
                agent_pool: AgentPool[Any] | None = None,
                enable_logging: bool = True,
            ) -> None:
                """Initialize message node."""
                super().__init__()
                from llmling_agent.mcp_server.manager import MCPManager
                from llmling_agent.messaging import EventManager
                from llmling_agent.messaging.connection_manager import ConnectionManager
        
                self.task_manager = TaskManager()
                self._name = name or self.__class__.__name__
                self.log = logger.bind(agent_name=self._name)
                self.agent_pool = agent_pool
                self.description = description
                self.connections = ConnectionManager(self)
                self._events = EventManager(self, enable_events=True)
                name_ = f"node_{self._name}"
                self.mcp = MCPManager(
                    name_, servers=mcp_servers, input_provider=input_provider, owner=self.name
                )
                self.enable_db_logging = enable_logging
                self.conversation_id = str(uuid4())
                # Connect to the combined signal to capture all messages
                # TODO: need to check this
                # node.message_received.connect(self.log_message)
        
            async def __aenter__(self) -> Self:
                """Initialize base message node."""
                if self.enable_db_logging:
                    await self.context.storage.log_conversation(
                        conversation_id=self.conversation_id,
                        node_name=self.name,
                    )
                try:
                    await self._events.__aenter__()
                    await self.mcp.__aenter__()
                except Exception as e:
                    await self.__aexit__(type(e), e, e.__traceback__)
                    msg = f"Failed to initialize {self.name}"
                    raise RuntimeError(msg) from e
                else:
                    return self
        
            async def __aexit__(
                self,
                exc_type: type[BaseException] | None,
                exc_val: BaseException | None,
                exc_tb: TracebackType | None,
            ) -> None:
                """Clean up base resources."""
                await self._events.__aexit__(exc_type, exc_val, exc_tb)
                await self.mcp.__aexit__(exc_type, exc_val, exc_tb)
                await self.task_manager.cleanup_tasks()
        
            @property
            def connection_stats(self) -> AggregatedTalkStats:
                """Get stats for all active connections of this node."""
                stats = [talk.stats for talk in self.connections.get_connections()]
                return AggregatedTalkStats(stats=stats)
        
            @property
            def context(self) -> NodeContext:
                """Get node context."""
                raise NotImplementedError
        
            @property
            def name(self) -> str:
                """Get agent name."""
                return self._name or "llmling-agent"
        
            @name.setter
            def name(self, value: str) -> None:
                self._name = value
        
            @overload
            def __rshift__(
                self, other: MessageNode[Any, Any] | ProcessorCallback[Any]
            ) -> Talk[TResult]: ...
        
            @overload
            def __rshift__(
                self, other: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]]
            ) -> TeamTalk[TResult]: ...
        
            def __rshift__(
                self,
                other: MessageNode[Any, Any]
                | ProcessorCallback[Any]
                | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
            ) -> Talk[Any] | TeamTalk[Any]:
                """Connect agent to another agent or group.
        
                Example:
                    agent >> other_agent  # Connect to single agent
                    agent >> (agent2 & agent3)  # Connect to group
                    agent >> "other_agent"  # Connect by name (needs pool)
                """
                return self.connect_to(other)
        
            @overload
            def connect_to(
                self,
                target: MessageNode[Any, Any] | ProcessorCallback[Any],
                *,
                queued: Literal[True],
                queue_strategy: Literal["concat"],
            ) -> Talk[str]: ...
        
            @overload
            def connect_to(
                self,
                target: MessageNode[Any, Any] | ProcessorCallback[Any],
                *,
                connection_type: ConnectionType = "run",
                name: str | None = None,
                priority: int = 0,
                delay: timedelta | None = None,
                queued: bool = False,
                queue_strategy: QueueStrategy = "latest",
                transform: AnyTransformFn[Any] | None = None,
                filter_condition: AsyncFilterFn | None = None,
                stop_condition: AsyncFilterFn | None = None,
                exit_condition: AsyncFilterFn | None = None,
            ) -> Talk[TResult]: ...
        
            @overload
            def connect_to(
                self,
                target: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
                *,
                queued: Literal[True],
                queue_strategy: Literal["concat"],
            ) -> TeamTalk[str]: ...
        
            @overload
            def connect_to(
                self,
                target: Sequence[MessageNode[Any, TResult] | ProcessorCallback[TResult]],
                *,
                connection_type: ConnectionType = "run",
                name: str | None = None,
                priority: int = 0,
                delay: timedelta | None = None,
                queued: bool = False,
                queue_strategy: QueueStrategy = "latest",
                transform: AnyTransformFn[Any] | None = None,
                filter_condition: AsyncFilterFn | None = None,
                stop_condition: AsyncFilterFn | None = None,
                exit_condition: AsyncFilterFn | None = None,
            ) -> TeamTalk[TResult]: ...
        
            @overload
            def connect_to(
                self,
                target: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
                *,
                connection_type: ConnectionType = "run",
                name: str | None = None,
                priority: int = 0,
                delay: timedelta | None = None,
                queued: bool = False,
                queue_strategy: QueueStrategy = "latest",
                transform: AnyTransformFn[Any] | None = None,
                filter_condition: AsyncFilterFn | None = None,
                stop_condition: AsyncFilterFn | None = None,
                exit_condition: AsyncFilterFn | None = None,
            ) -> TeamTalk: ...
        
            def connect_to(
                self,
                target: MessageNode[Any, Any]
                | ProcessorCallback[Any]
                | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
                *,
                connection_type: ConnectionType = "run",
                name: str | None = None,
                priority: int = 0,
                delay: timedelta | None = None,
                queued: bool = False,
                queue_strategy: QueueStrategy = "latest",
                transform: AnyTransformFn[Any] | None = None,
                filter_condition: AsyncFilterFn | None = None,
                stop_condition: AsyncFilterFn | None = None,
                exit_condition: AsyncFilterFn | None = None,
            ) -> Talk[Any] | TeamTalk:
                """Create connection(s) to target(s)."""
                # Handle callable case
                from llmling_agent.agent import Agent
                from llmling_agent.delegation.base_team import BaseTeam
        
                if callable(target):
                    target = Agent.from_callback(target)
                    if pool := self.context.pool:
                        pool.register(target.name, target)
                # we are explicit here just to make disctinction clear, we only want sequences
                # of message units
                if isinstance(target, Sequence) and not isinstance(target, BaseTeam):
                    targets: list[MessageNode[Any, Any]] = []
                    for t in target:
                        match t:
                            case _ if callable(t):
                                other = Agent.from_callback(t)
                                if pool := self.context.pool:
                                    pool.register(other.name, other)
                                targets.append(other)
                            case MessageNode():
                                targets.append(t)
                            case _:
                                msg = f"Invalid node type: {type(t)}"
                                raise TypeError(msg)
                else:
                    targets = target  # type: ignore
                return self.connections.create_connection(
                    self,
                    targets,
                    connection_type=connection_type,
                    priority=priority,
                    name=name,
                    delay=delay,
                    queued=queued,
                    queue_strategy=queue_strategy,
                    transform=transform,
                    filter_condition=filter_condition,
                    stop_condition=stop_condition,
                    exit_condition=exit_condition,
                )
        
            async def disconnect_all(self) -> None:
                """Disconnect from all nodes."""
                for target in list(self.connections.get_targets()):
                    self.stop_passing_results_to(target)
        
            def stop_passing_results_to(self, other: MessageNode[Any, Any]) -> None:
                """Stop forwarding results to another node."""
                self.connections.disconnect(other)
        
            @abstractmethod
            async def run(self, *prompts: Any, **kwargs: Any) -> ChatMessage[TResult]:
                """Execute node with prompts. Implementation-specific run logic."""
        
            async def get_message_history(self, limit: int | None = None) -> list[ChatMessage[Any]]:
                """Get message history from storage."""
                if not self.enable_db_logging:
                    return []  # No history if not logging
        
                from llmling_agent_config.session import SessionQuery
        
                query = SessionQuery(name=self.conversation_id, limit=limit)
                return await self.context.storage.filter_messages(query)
        
            async def log_message(self, message: ChatMessage[Any]) -> None:
                """Handle message from chat signal."""
                if self.enable_db_logging:
                    await self.context.storage.log_message(message)  # pyright: ignore
        
            @abstractmethod
            async def get_stats(self) -> MessageStats | AggregatedMessageStats:
                """Get message statistics for this node."""
        
            @abstractmethod
            def run_iter(self, *prompts: Any, **kwargs: Any) -> AsyncIterator[ChatMessage[Any]]:
                """Yield messages during execution."""
        

        connection_stats property

        connection_stats: AggregatedTalkStats
        

        Get stats for all active connections of this node.

        context property

        context: NodeContext
        

        Get node context.

        message_received class-attribute instance-attribute

        message_received = Signal(ChatMessage)
        

        Signal emitted when node receives a message.

        message_sent class-attribute instance-attribute

        message_sent = Signal(ChatMessage)
        

        Signal emitted when node creates a message.

        name property writable

        name: str
        

        Get agent name.

        __aenter__ async

        __aenter__() -> Self
        

        Initialize base message node.

        Source code in src/llmling_agent/messaging/messagenode.py
        82
        83
        84
        85
        86
        87
        88
        89
        90
        91
        92
        93
        94
        95
        96
        97
        async def __aenter__(self) -> Self:
            """Initialize base message node."""
            if self.enable_db_logging:
                await self.context.storage.log_conversation(
                    conversation_id=self.conversation_id,
                    node_name=self.name,
                )
            try:
                await self._events.__aenter__()
                await self.mcp.__aenter__()
            except Exception as e:
                await self.__aexit__(type(e), e, e.__traceback__)
                msg = f"Failed to initialize {self.name}"
                raise RuntimeError(msg) from e
            else:
                return self
        

        __aexit__ async

        __aexit__(
            exc_type: type[BaseException] | None,
            exc_val: BaseException | None,
            exc_tb: TracebackType | None,
        ) -> None
        

        Clean up base resources.

        Source code in src/llmling_agent/messaging/messagenode.py
         99
        100
        101
        102
        103
        104
        105
        106
        107
        108
        async def __aexit__(
            self,
            exc_type: type[BaseException] | None,
            exc_val: BaseException | None,
            exc_tb: TracebackType | None,
        ) -> None:
            """Clean up base resources."""
            await self._events.__aexit__(exc_type, exc_val, exc_tb)
            await self.mcp.__aexit__(exc_type, exc_val, exc_tb)
            await self.task_manager.cleanup_tasks()
        

        __init__

        __init__(
            name: str | None = None,
            description: str | None = None,
            input_provider: InputProvider | None = None,
            mcp_servers: Sequence[str | MCPServerConfig] | None = None,
            agent_pool: AgentPool[Any] | None = None,
            enable_logging: bool = True,
        ) -> None
        

        Initialize message node.

        Source code in src/llmling_agent/messaging/messagenode.py
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        74
        75
        76
        77
        def __init__(
            self,
            name: str | None = None,
            description: str | None = None,
            input_provider: InputProvider | None = None,
            mcp_servers: Sequence[str | MCPServerConfig] | None = None,
            agent_pool: AgentPool[Any] | None = None,
            enable_logging: bool = True,
        ) -> None:
            """Initialize message node."""
            super().__init__()
            from llmling_agent.mcp_server.manager import MCPManager
            from llmling_agent.messaging import EventManager
            from llmling_agent.messaging.connection_manager import ConnectionManager
        
            self.task_manager = TaskManager()
            self._name = name or self.__class__.__name__
            self.log = logger.bind(agent_name=self._name)
            self.agent_pool = agent_pool
            self.description = description
            self.connections = ConnectionManager(self)
            self._events = EventManager(self, enable_events=True)
            name_ = f"node_{self._name}"
            self.mcp = MCPManager(
                name_, servers=mcp_servers, input_provider=input_provider, owner=self.name
            )
            self.enable_db_logging = enable_logging
            self.conversation_id = str(uuid4())
        

        __rshift__

        __rshift__(other: MessageNode[Any, Any] | ProcessorCallback[Any]) -> Talk[TResult]
        
        __rshift__(
            other: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
        ) -> TeamTalk[TResult]
        
        __rshift__(
            other: (
                MessageNode[Any, Any]
                | ProcessorCallback[Any]
                | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]]
            ),
        ) -> Talk[Any] | TeamTalk[Any]
        

        Connect agent to another agent or group.

        Example

        agent >> other_agent # Connect to single agent agent >> (agent2 & agent3) # Connect to group agent >> "other_agent" # Connect by name (needs pool)

        Source code in src/llmling_agent/messaging/messagenode.py
        140
        141
        142
        143
        144
        145
        146
        147
        148
        149
        150
        151
        152
        153
        def __rshift__(
            self,
            other: MessageNode[Any, Any]
            | ProcessorCallback[Any]
            | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
        ) -> Talk[Any] | TeamTalk[Any]:
            """Connect agent to another agent or group.
        
            Example:
                agent >> other_agent  # Connect to single agent
                agent >> (agent2 & agent3)  # Connect to group
                agent >> "other_agent"  # Connect by name (needs pool)
            """
            return self.connect_to(other)
        

        connect_to

        connect_to(
            target: MessageNode[Any, Any] | ProcessorCallback[Any],
            *,
            queued: Literal[True],
            queue_strategy: Literal["concat"]
        ) -> Talk[str]
        
        connect_to(
            target: MessageNode[Any, Any] | ProcessorCallback[Any],
            *,
            connection_type: ConnectionType = "run",
            name: str | None = None,
            priority: int = 0,
            delay: timedelta | None = None,
            queued: bool = False,
            queue_strategy: QueueStrategy = "latest",
            transform: AnyTransformFn[Any] | None = None,
            filter_condition: AsyncFilterFn | None = None,
            stop_condition: AsyncFilterFn | None = None,
            exit_condition: AsyncFilterFn | None = None
        ) -> Talk[TResult]
        
        connect_to(
            target: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
            *,
            queued: Literal[True],
            queue_strategy: Literal["concat"]
        ) -> TeamTalk[str]
        
        connect_to(
            target: Sequence[MessageNode[Any, TResult] | ProcessorCallback[TResult]],
            *,
            connection_type: ConnectionType = "run",
            name: str | None = None,
            priority: int = 0,
            delay: timedelta | None = None,
            queued: bool = False,
            queue_strategy: QueueStrategy = "latest",
            transform: AnyTransformFn[Any] | None = None,
            filter_condition: AsyncFilterFn | None = None,
            stop_condition: AsyncFilterFn | None = None,
            exit_condition: AsyncFilterFn | None = None
        ) -> TeamTalk[TResult]
        
        connect_to(
            target: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
            *,
            connection_type: ConnectionType = "run",
            name: str | None = None,
            priority: int = 0,
            delay: timedelta | None = None,
            queued: bool = False,
            queue_strategy: QueueStrategy = "latest",
            transform: AnyTransformFn[Any] | None = None,
            filter_condition: AsyncFilterFn | None = None,
            stop_condition: AsyncFilterFn | None = None,
            exit_condition: AsyncFilterFn | None = None
        ) -> TeamTalk
        
        connect_to(
            target: (
                MessageNode[Any, Any]
                | ProcessorCallback[Any]
                | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]]
            ),
            *,
            connection_type: ConnectionType = "run",
            name: str | None = None,
            priority: int = 0,
            delay: timedelta | None = None,
            queued: bool = False,
            queue_strategy: QueueStrategy = "latest",
            transform: AnyTransformFn[Any] | None = None,
            filter_condition: AsyncFilterFn | None = None,
            stop_condition: AsyncFilterFn | None = None,
            exit_condition: AsyncFilterFn | None = None
        ) -> Talk[Any] | TeamTalk
        

        Create connection(s) to target(s).

        Source code in src/llmling_agent/messaging/messagenode.py
        224
        225
        226
        227
        228
        229
        230
        231
        232
        233
        234
        235
        236
        237
        238
        239
        240
        241
        242
        243
        244
        245
        246
        247
        248
        249
        250
        251
        252
        253
        254
        255
        256
        257
        258
        259
        260
        261
        262
        263
        264
        265
        266
        267
        268
        269
        270
        271
        272
        273
        274
        275
        276
        277
        278
        279
        280
        281
        def connect_to(
            self,
            target: MessageNode[Any, Any]
            | ProcessorCallback[Any]
            | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
            *,
            connection_type: ConnectionType = "run",
            name: str | None = None,
            priority: int = 0,
            delay: timedelta | None = None,
            queued: bool = False,
            queue_strategy: QueueStrategy = "latest",
            transform: AnyTransformFn[Any] | None = None,
            filter_condition: AsyncFilterFn | None = None,
            stop_condition: AsyncFilterFn | None = None,
            exit_condition: AsyncFilterFn | None = None,
        ) -> Talk[Any] | TeamTalk:
            """Create connection(s) to target(s)."""
            # Handle callable case
            from llmling_agent.agent import Agent
            from llmling_agent.delegation.base_team import BaseTeam
        
            if callable(target):
                target = Agent.from_callback(target)
                if pool := self.context.pool:
                    pool.register(target.name, target)
            # we are explicit here just to make disctinction clear, we only want sequences
            # of message units
            if isinstance(target, Sequence) and not isinstance(target, BaseTeam):
                targets: list[MessageNode[Any, Any]] = []
                for t in target:
                    match t:
                        case _ if callable(t):
                            other = Agent.from_callback(t)
                            if pool := self.context.pool:
                                pool.register(other.name, other)
                            targets.append(other)
                        case MessageNode():
                            targets.append(t)
                        case _:
                            msg = f"Invalid node type: {type(t)}"
                            raise TypeError(msg)
            else:
                targets = target  # type: ignore
            return self.connections.create_connection(
                self,
                targets,
                connection_type=connection_type,
                priority=priority,
                name=name,
                delay=delay,
                queued=queued,
                queue_strategy=queue_strategy,
                transform=transform,
                filter_condition=filter_condition,
                stop_condition=stop_condition,
                exit_condition=exit_condition,
            )
        

        disconnect_all async

        disconnect_all() -> None
        

        Disconnect from all nodes.

        Source code in src/llmling_agent/messaging/messagenode.py
        283
        284
        285
        286
        async def disconnect_all(self) -> None:
            """Disconnect from all nodes."""
            for target in list(self.connections.get_targets()):
                self.stop_passing_results_to(target)
        

        get_message_history async

        get_message_history(limit: int | None = None) -> list[ChatMessage[Any]]
        

        Get message history from storage.

        Source code in src/llmling_agent/messaging/messagenode.py
        296
        297
        298
        299
        300
        301
        302
        303
        304
        async def get_message_history(self, limit: int | None = None) -> list[ChatMessage[Any]]:
            """Get message history from storage."""
            if not self.enable_db_logging:
                return []  # No history if not logging
        
            from llmling_agent_config.session import SessionQuery
        
            query = SessionQuery(name=self.conversation_id, limit=limit)
            return await self.context.storage.filter_messages(query)
        

        get_stats abstractmethod async

        Get message statistics for this node.

        Source code in src/llmling_agent/messaging/messagenode.py
        311
        312
        313
        @abstractmethod
        async def get_stats(self) -> MessageStats | AggregatedMessageStats:
            """Get message statistics for this node."""
        

        log_message async

        log_message(message: ChatMessage[Any]) -> None
        

        Handle message from chat signal.

        Source code in src/llmling_agent/messaging/messagenode.py
        306
        307
        308
        309
        async def log_message(self, message: ChatMessage[Any]) -> None:
            """Handle message from chat signal."""
            if self.enable_db_logging:
                await self.context.storage.log_message(message)  # pyright: ignore
        

        run abstractmethod async

        run(*prompts: Any, **kwargs: Any) -> ChatMessage[TResult]
        

        Execute node with prompts. Implementation-specific run logic.

        Source code in src/llmling_agent/messaging/messagenode.py
        292
        293
        294
        @abstractmethod
        async def run(self, *prompts: Any, **kwargs: Any) -> ChatMessage[TResult]:
            """Execute node with prompts. Implementation-specific run logic."""
        

        run_iter abstractmethod

        run_iter(*prompts: Any, **kwargs: Any) -> AsyncIterator[ChatMessage[Any]]
        

        Yield messages during execution.

        Source code in src/llmling_agent/messaging/messagenode.py
        315
        316
        317
        @abstractmethod
        def run_iter(self, *prompts: Any, **kwargs: Any) -> AsyncIterator[ChatMessage[Any]]:
            """Yield messages during execution."""
        

        stop_passing_results_to

        stop_passing_results_to(other: MessageNode[Any, Any]) -> None
        

        Stop forwarding results to another node.

        Source code in src/llmling_agent/messaging/messagenode.py
        288
        289
        290
        def stop_passing_results_to(self, other: MessageNode[Any, Any]) -> None:
            """Stop forwarding results to another node."""
            self.connections.disconnect(other)