Skip to content

MessageNode

Sub classes

Name Children Inherits
Agent
llmling_agent.agent.agent
The main agent class.
    BaseTeam
    llmling_agent.delegation.base_team
    Base class for Team and TeamRun.

    Base classes

    Name Children Inherits
    ABC
    abc
    Helper class that provides a standard way to create an ABC using
    Generic
    typing
    Abstract base class for generic types.

    ⋔ Inheritance diagram

    graph TD
      94001345984832["messagenode.MessageNode"]
      94001297736352["abc.ABC"]
      140380010846688["builtins.object"]
      94001297341184["typing.Generic"]
      94001297736352 --> 94001345984832
      140380010846688 --> 94001297736352
      94001297341184 --> 94001345984832
      140380010846688 --> 94001297341184

    🛈 DocStrings

    Bases: ABC

    Base class for all message processing nodes.

    Source code in src/llmling_agent/messaging/messagenode.py
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    282
    283
    284
    285
    286
    287
    288
    289
    290
    291
    292
    293
    294
    295
    296
    297
    298
    299
    300
    301
    302
    303
    304
    305
    306
    307
    308
    309
    310
    311
    312
    313
    314
    315
    316
    317
    class MessageNode[TDeps, TResult](ABC):
        """Base class for all message processing nodes."""
    
        message_received = Signal(ChatMessage)
        """Signal emitted when node receives a message."""
    
        message_sent = Signal(ChatMessage)
        """Signal emitted when node creates a message."""
    
        def __init__(
            self,
            name: str | None = None,
            description: str | None = None,
            input_provider: InputProvider | None = None,
            mcp_servers: Sequence[str | MCPServerConfig] | None = None,
            agent_pool: AgentPool[Any] | None = None,
            enable_logging: bool = True,
        ) -> None:
            """Initialize message node."""
            super().__init__()
            from llmling_agent.mcp_server.manager import MCPManager
            from llmling_agent.messaging import EventManager
            from llmling_agent.messaging.connection_manager import ConnectionManager
    
            self.task_manager = TaskManager()
            self._name = name or self.__class__.__name__
            self.log = logger.bind(agent_name=self._name)
            self.agent_pool = agent_pool
            self.description = description
            self.connections = ConnectionManager(self)
            self._events = EventManager(self, enable_events=True)
            name_ = f"node_{self._name}"
            self.mcp = MCPManager(
                name_, servers=mcp_servers, input_provider=input_provider, owner=self.name
            )
            self.enable_db_logging = enable_logging
            self.conversation_id = str(uuid4())
            # Connect to the combined signal to capture all messages
            # TODO: need to check this
            # node.message_received.connect(self.log_message)
    
        async def __aenter__(self) -> Self:
            """Initialize base message node."""
            if self.enable_db_logging:
                await self.context.storage.log_conversation(
                    conversation_id=self.conversation_id,
                    node_name=self.name,
                )
            try:
                await self._events.__aenter__()
                await self.mcp.__aenter__()
            except Exception as e:
                await self.__aexit__(type(e), e, e.__traceback__)
                msg = f"Failed to initialize {self.name}"
                raise RuntimeError(msg) from e
            else:
                return self
    
        async def __aexit__(
            self,
            exc_type: type[BaseException] | None,
            exc_val: BaseException | None,
            exc_tb: TracebackType | None,
        ) -> None:
            """Clean up base resources."""
            await self._events.__aexit__(exc_type, exc_val, exc_tb)
            await self.mcp.__aexit__(exc_type, exc_val, exc_tb)
            await self.task_manager.cleanup_tasks()
    
        @property
        def connection_stats(self) -> AggregatedTalkStats:
            """Get stats for all active connections of this node."""
            stats = [talk.stats for talk in self.connections.get_connections()]
            return AggregatedTalkStats(stats=stats)
    
        @property
        def context(self) -> NodeContext:
            """Get node context."""
            raise NotImplementedError
    
        @property
        def name(self) -> str:
            """Get agent name."""
            return self._name or "llmling-agent"
    
        @name.setter
        def name(self, value: str) -> None:
            self._name = value
    
        @overload
        def __rshift__(
            self, other: MessageNode[Any, Any] | ProcessorCallback[Any]
        ) -> Talk[TResult]: ...
    
        @overload
        def __rshift__(
            self, other: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]]
        ) -> TeamTalk[TResult]: ...
    
        def __rshift__(
            self,
            other: MessageNode[Any, Any]
            | ProcessorCallback[Any]
            | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
        ) -> Talk[Any] | TeamTalk[Any]:
            """Connect agent to another agent or group.
    
            Example:
                agent >> other_agent  # Connect to single agent
                agent >> (agent2 & agent3)  # Connect to group
                agent >> "other_agent"  # Connect by name (needs pool)
            """
            return self.connect_to(other)
    
        @overload
        def connect_to(
            self,
            target: MessageNode[Any, Any] | ProcessorCallback[Any],
            *,
            queued: Literal[True],
            queue_strategy: Literal["concat"],
        ) -> Talk[str]: ...
    
        @overload
        def connect_to(
            self,
            target: MessageNode[Any, Any] | ProcessorCallback[Any],
            *,
            connection_type: ConnectionType = "run",
            name: str | None = None,
            priority: int = 0,
            delay: timedelta | None = None,
            queued: bool = False,
            queue_strategy: QueueStrategy = "latest",
            transform: AnyTransformFn[Any] | None = None,
            filter_condition: AsyncFilterFn | None = None,
            stop_condition: AsyncFilterFn | None = None,
            exit_condition: AsyncFilterFn | None = None,
        ) -> Talk[TResult]: ...
    
        @overload
        def connect_to(
            self,
            target: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
            *,
            queued: Literal[True],
            queue_strategy: Literal["concat"],
        ) -> TeamTalk[str]: ...
    
        @overload
        def connect_to(
            self,
            target: Sequence[MessageNode[Any, TResult] | ProcessorCallback[TResult]],
            *,
            connection_type: ConnectionType = "run",
            name: str | None = None,
            priority: int = 0,
            delay: timedelta | None = None,
            queued: bool = False,
            queue_strategy: QueueStrategy = "latest",
            transform: AnyTransformFn[Any] | None = None,
            filter_condition: AsyncFilterFn | None = None,
            stop_condition: AsyncFilterFn | None = None,
            exit_condition: AsyncFilterFn | None = None,
        ) -> TeamTalk[TResult]: ...
    
        @overload
        def connect_to(
            self,
            target: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
            *,
            connection_type: ConnectionType = "run",
            name: str | None = None,
            priority: int = 0,
            delay: timedelta | None = None,
            queued: bool = False,
            queue_strategy: QueueStrategy = "latest",
            transform: AnyTransformFn[Any] | None = None,
            filter_condition: AsyncFilterFn | None = None,
            stop_condition: AsyncFilterFn | None = None,
            exit_condition: AsyncFilterFn | None = None,
        ) -> TeamTalk: ...
    
        def connect_to(
            self,
            target: MessageNode[Any, Any]
            | ProcessorCallback[Any]
            | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
            *,
            connection_type: ConnectionType = "run",
            name: str | None = None,
            priority: int = 0,
            delay: timedelta | None = None,
            queued: bool = False,
            queue_strategy: QueueStrategy = "latest",
            transform: AnyTransformFn[Any] | None = None,
            filter_condition: AsyncFilterFn | None = None,
            stop_condition: AsyncFilterFn | None = None,
            exit_condition: AsyncFilterFn | None = None,
        ) -> Talk[Any] | TeamTalk:
            """Create connection(s) to target(s)."""
            # Handle callable case
            from llmling_agent.agent import Agent
            from llmling_agent.delegation.base_team import BaseTeam
    
            if callable(target):
                target = Agent.from_callback(target)
                if pool := self.context.pool:
                    pool.register(target.name, target)
            # we are explicit here just to make disctinction clear, we only want sequences
            # of message units
            if isinstance(target, Sequence) and not isinstance(target, BaseTeam):
                targets: list[MessageNode[Any, Any]] = []
                for t in target:
                    match t:
                        case _ if callable(t):
                            other = Agent.from_callback(t)
                            if pool := self.context.pool:
                                pool.register(other.name, other)
                            targets.append(other)
                        case MessageNode():
                            targets.append(t)
                        case _:
                            msg = f"Invalid node type: {type(t)}"
                            raise TypeError(msg)
            else:
                targets = target  # type: ignore
            return self.connections.create_connection(
                self,
                targets,
                connection_type=connection_type,
                priority=priority,
                name=name,
                delay=delay,
                queued=queued,
                queue_strategy=queue_strategy,
                transform=transform,
                filter_condition=filter_condition,
                stop_condition=stop_condition,
                exit_condition=exit_condition,
            )
    
        async def disconnect_all(self) -> None:
            """Disconnect from all nodes."""
            for target in list(self.connections.get_targets()):
                self.stop_passing_results_to(target)
    
        def stop_passing_results_to(self, other: MessageNode[Any, Any]) -> None:
            """Stop forwarding results to another node."""
            self.connections.disconnect(other)
    
        @abstractmethod
        async def run(self, *prompts: Any, **kwargs: Any) -> ChatMessage[TResult]:
            """Execute node with prompts. Implementation-specific run logic."""
    
        async def get_message_history(self, limit: int | None = None) -> list[ChatMessage[Any]]:
            """Get message history from storage."""
            if not self.enable_db_logging:
                return []  # No history if not logging
    
            from llmling_agent_config.session import SessionQuery
    
            query = SessionQuery(name=self.conversation_id, limit=limit)
            return await self.context.storage.filter_messages(query)
    
        async def log_message(self, message: ChatMessage[Any]) -> None:
            """Handle message from chat signal."""
            if self.enable_db_logging:
                await self.context.storage.log_message(message)  # pyright: ignore
    
        @abstractmethod
        async def get_stats(self) -> MessageStats | AggregatedMessageStats:
            """Get message statistics for this node."""
    
        @abstractmethod
        def run_iter(self, *prompts: Any, **kwargs: Any) -> AsyncIterator[ChatMessage[Any]]:
            """Yield messages during execution."""
    

    connection_stats property

    connection_stats: AggregatedTalkStats
    

    Get stats for all active connections of this node.

    context property

    context: NodeContext
    

    Get node context.

    message_received class-attribute instance-attribute

    message_received = Signal(ChatMessage)
    

    Signal emitted when node receives a message.

    message_sent class-attribute instance-attribute

    message_sent = Signal(ChatMessage)
    

    Signal emitted when node creates a message.

    name property writable

    name: str
    

    Get agent name.

    __aenter__ async

    __aenter__() -> Self
    

    Initialize base message node.

    Source code in src/llmling_agent/messaging/messagenode.py
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    async def __aenter__(self) -> Self:
        """Initialize base message node."""
        if self.enable_db_logging:
            await self.context.storage.log_conversation(
                conversation_id=self.conversation_id,
                node_name=self.name,
            )
        try:
            await self._events.__aenter__()
            await self.mcp.__aenter__()
        except Exception as e:
            await self.__aexit__(type(e), e, e.__traceback__)
            msg = f"Failed to initialize {self.name}"
            raise RuntimeError(msg) from e
        else:
            return self
    

    __aexit__ async

    __aexit__(
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None
    

    Clean up base resources.

    Source code in src/llmling_agent/messaging/messagenode.py
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Clean up base resources."""
        await self._events.__aexit__(exc_type, exc_val, exc_tb)
        await self.mcp.__aexit__(exc_type, exc_val, exc_tb)
        await self.task_manager.cleanup_tasks()
    

    __init__

    __init__(
        name: str | None = None,
        description: str | None = None,
        input_provider: InputProvider | None = None,
        mcp_servers: Sequence[str | MCPServerConfig] | None = None,
        agent_pool: AgentPool[Any] | None = None,
        enable_logging: bool = True,
    ) -> None
    

    Initialize message node.

    Source code in src/llmling_agent/messaging/messagenode.py
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    def __init__(
        self,
        name: str | None = None,
        description: str | None = None,
        input_provider: InputProvider | None = None,
        mcp_servers: Sequence[str | MCPServerConfig] | None = None,
        agent_pool: AgentPool[Any] | None = None,
        enable_logging: bool = True,
    ) -> None:
        """Initialize message node."""
        super().__init__()
        from llmling_agent.mcp_server.manager import MCPManager
        from llmling_agent.messaging import EventManager
        from llmling_agent.messaging.connection_manager import ConnectionManager
    
        self.task_manager = TaskManager()
        self._name = name or self.__class__.__name__
        self.log = logger.bind(agent_name=self._name)
        self.agent_pool = agent_pool
        self.description = description
        self.connections = ConnectionManager(self)
        self._events = EventManager(self, enable_events=True)
        name_ = f"node_{self._name}"
        self.mcp = MCPManager(
            name_, servers=mcp_servers, input_provider=input_provider, owner=self.name
        )
        self.enable_db_logging = enable_logging
        self.conversation_id = str(uuid4())
    

    __rshift__

    __rshift__(other: MessageNode[Any, Any] | ProcessorCallback[Any]) -> Talk[TResult]
    
    __rshift__(
        other: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
    ) -> TeamTalk[TResult]
    
    __rshift__(
        other: (
            MessageNode[Any, Any]
            | ProcessorCallback[Any]
            | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]]
        ),
    ) -> Talk[Any] | TeamTalk[Any]
    

    Connect agent to another agent or group.

    Example

    agent >> other_agent # Connect to single agent agent >> (agent2 & agent3) # Connect to group agent >> "other_agent" # Connect by name (needs pool)

    Source code in src/llmling_agent/messaging/messagenode.py
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    def __rshift__(
        self,
        other: MessageNode[Any, Any]
        | ProcessorCallback[Any]
        | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
    ) -> Talk[Any] | TeamTalk[Any]:
        """Connect agent to another agent or group.
    
        Example:
            agent >> other_agent  # Connect to single agent
            agent >> (agent2 & agent3)  # Connect to group
            agent >> "other_agent"  # Connect by name (needs pool)
        """
        return self.connect_to(other)
    

    connect_to

    connect_to(
        target: MessageNode[Any, Any] | ProcessorCallback[Any],
        *,
        queued: Literal[True],
        queue_strategy: Literal["concat"]
    ) -> Talk[str]
    
    connect_to(
        target: MessageNode[Any, Any] | ProcessorCallback[Any],
        *,
        connection_type: ConnectionType = "run",
        name: str | None = None,
        priority: int = 0,
        delay: timedelta | None = None,
        queued: bool = False,
        queue_strategy: QueueStrategy = "latest",
        transform: AnyTransformFn[Any] | None = None,
        filter_condition: AsyncFilterFn | None = None,
        stop_condition: AsyncFilterFn | None = None,
        exit_condition: AsyncFilterFn | None = None
    ) -> Talk[TResult]
    
    connect_to(
        target: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
        *,
        queued: Literal[True],
        queue_strategy: Literal["concat"]
    ) -> TeamTalk[str]
    
    connect_to(
        target: Sequence[MessageNode[Any, TResult] | ProcessorCallback[TResult]],
        *,
        connection_type: ConnectionType = "run",
        name: str | None = None,
        priority: int = 0,
        delay: timedelta | None = None,
        queued: bool = False,
        queue_strategy: QueueStrategy = "latest",
        transform: AnyTransformFn[Any] | None = None,
        filter_condition: AsyncFilterFn | None = None,
        stop_condition: AsyncFilterFn | None = None,
        exit_condition: AsyncFilterFn | None = None
    ) -> TeamTalk[TResult]
    
    connect_to(
        target: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
        *,
        connection_type: ConnectionType = "run",
        name: str | None = None,
        priority: int = 0,
        delay: timedelta | None = None,
        queued: bool = False,
        queue_strategy: QueueStrategy = "latest",
        transform: AnyTransformFn[Any] | None = None,
        filter_condition: AsyncFilterFn | None = None,
        stop_condition: AsyncFilterFn | None = None,
        exit_condition: AsyncFilterFn | None = None
    ) -> TeamTalk
    
    connect_to(
        target: (
            MessageNode[Any, Any]
            | ProcessorCallback[Any]
            | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]]
        ),
        *,
        connection_type: ConnectionType = "run",
        name: str | None = None,
        priority: int = 0,
        delay: timedelta | None = None,
        queued: bool = False,
        queue_strategy: QueueStrategy = "latest",
        transform: AnyTransformFn[Any] | None = None,
        filter_condition: AsyncFilterFn | None = None,
        stop_condition: AsyncFilterFn | None = None,
        exit_condition: AsyncFilterFn | None = None
    ) -> Talk[Any] | TeamTalk
    

    Create connection(s) to target(s).

    Source code in src/llmling_agent/messaging/messagenode.py
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    def connect_to(
        self,
        target: MessageNode[Any, Any]
        | ProcessorCallback[Any]
        | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
        *,
        connection_type: ConnectionType = "run",
        name: str | None = None,
        priority: int = 0,
        delay: timedelta | None = None,
        queued: bool = False,
        queue_strategy: QueueStrategy = "latest",
        transform: AnyTransformFn[Any] | None = None,
        filter_condition: AsyncFilterFn | None = None,
        stop_condition: AsyncFilterFn | None = None,
        exit_condition: AsyncFilterFn | None = None,
    ) -> Talk[Any] | TeamTalk:
        """Create connection(s) to target(s)."""
        # Handle callable case
        from llmling_agent.agent import Agent
        from llmling_agent.delegation.base_team import BaseTeam
    
        if callable(target):
            target = Agent.from_callback(target)
            if pool := self.context.pool:
                pool.register(target.name, target)
        # we are explicit here just to make disctinction clear, we only want sequences
        # of message units
        if isinstance(target, Sequence) and not isinstance(target, BaseTeam):
            targets: list[MessageNode[Any, Any]] = []
            for t in target:
                match t:
                    case _ if callable(t):
                        other = Agent.from_callback(t)
                        if pool := self.context.pool:
                            pool.register(other.name, other)
                        targets.append(other)
                    case MessageNode():
                        targets.append(t)
                    case _:
                        msg = f"Invalid node type: {type(t)}"
                        raise TypeError(msg)
        else:
            targets = target  # type: ignore
        return self.connections.create_connection(
            self,
            targets,
            connection_type=connection_type,
            priority=priority,
            name=name,
            delay=delay,
            queued=queued,
            queue_strategy=queue_strategy,
            transform=transform,
            filter_condition=filter_condition,
            stop_condition=stop_condition,
            exit_condition=exit_condition,
        )
    

    disconnect_all async

    disconnect_all() -> None
    

    Disconnect from all nodes.

    Source code in src/llmling_agent/messaging/messagenode.py
    283
    284
    285
    286
    async def disconnect_all(self) -> None:
        """Disconnect from all nodes."""
        for target in list(self.connections.get_targets()):
            self.stop_passing_results_to(target)
    

    get_message_history async

    get_message_history(limit: int | None = None) -> list[ChatMessage[Any]]
    

    Get message history from storage.

    Source code in src/llmling_agent/messaging/messagenode.py
    296
    297
    298
    299
    300
    301
    302
    303
    304
    async def get_message_history(self, limit: int | None = None) -> list[ChatMessage[Any]]:
        """Get message history from storage."""
        if not self.enable_db_logging:
            return []  # No history if not logging
    
        from llmling_agent_config.session import SessionQuery
    
        query = SessionQuery(name=self.conversation_id, limit=limit)
        return await self.context.storage.filter_messages(query)
    

    get_stats abstractmethod async

    Get message statistics for this node.

    Source code in src/llmling_agent/messaging/messagenode.py
    311
    312
    313
    @abstractmethod
    async def get_stats(self) -> MessageStats | AggregatedMessageStats:
        """Get message statistics for this node."""
    

    log_message async

    log_message(message: ChatMessage[Any]) -> None
    

    Handle message from chat signal.

    Source code in src/llmling_agent/messaging/messagenode.py
    306
    307
    308
    309
    async def log_message(self, message: ChatMessage[Any]) -> None:
        """Handle message from chat signal."""
        if self.enable_db_logging:
            await self.context.storage.log_message(message)  # pyright: ignore
    

    run abstractmethod async

    run(*prompts: Any, **kwargs: Any) -> ChatMessage[TResult]
    

    Execute node with prompts. Implementation-specific run logic.

    Source code in src/llmling_agent/messaging/messagenode.py
    292
    293
    294
    @abstractmethod
    async def run(self, *prompts: Any, **kwargs: Any) -> ChatMessage[TResult]:
        """Execute node with prompts. Implementation-specific run logic."""
    

    run_iter abstractmethod

    run_iter(*prompts: Any, **kwargs: Any) -> AsyncIterator[ChatMessage[Any]]
    

    Yield messages during execution.

    Source code in src/llmling_agent/messaging/messagenode.py
    315
    316
    317
    @abstractmethod
    def run_iter(self, *prompts: Any, **kwargs: Any) -> AsyncIterator[ChatMessage[Any]]:
        """Yield messages during execution."""
    

    stop_passing_results_to

    stop_passing_results_to(other: MessageNode[Any, Any]) -> None
    

    Stop forwarding results to another node.

    Source code in src/llmling_agent/messaging/messagenode.py
    288
    289
    290
    def stop_passing_results_to(self, other: MessageNode[Any, Any]) -> None:
        """Stop forwarding results to another node."""
        self.connections.disconnect(other)
    

    Show source on GitHub