Skip to content

connection_manager

Class info

Classes

Name Children Inherits
AggregatedTalkStats
llmling_agent.talk.stats
Statistics aggregated from multiple connections.
    ConnectionManager
    llmling_agent.messaging.connection_manager
    Manages connections for both Agents and Teams.
      Talk
      llmling_agent.talk.talk
      Manages message flow between agents/groups.
        TeamTalk
        llmling_agent.talk.talk
        Group of connections with aggregate operations.

        🛈 DocStrings

        Manages message flow between agents/groups.

        ConnectionManager

        Manages connections for both Agents and Teams.

        Source code in src/llmling_agent/messaging/connection_manager.py
         33
         34
         35
         36
         37
         38
         39
         40
         41
         42
         43
         44
         45
         46
         47
         48
         49
         50
         51
         52
         53
         54
         55
         56
         57
         58
         59
         60
         61
         62
         63
         64
         65
         66
         67
         68
         69
         70
         71
         72
         73
         74
         75
         76
         77
         78
         79
         80
         81
         82
         83
         84
         85
         86
         87
         88
         89
         90
         91
         92
         93
         94
         95
         96
         97
         98
         99
        100
        101
        102
        103
        104
        105
        106
        107
        108
        109
        110
        111
        112
        113
        114
        115
        116
        117
        118
        119
        120
        121
        122
        123
        124
        125
        126
        127
        128
        129
        130
        131
        132
        133
        134
        135
        136
        137
        138
        139
        140
        141
        142
        143
        144
        145
        146
        147
        148
        149
        150
        151
        152
        153
        154
        155
        156
        157
        158
        159
        160
        161
        162
        163
        164
        165
        166
        167
        168
        169
        170
        171
        172
        173
        174
        175
        176
        177
        178
        179
        180
        181
        182
        183
        184
        185
        186
        187
        188
        189
        190
        191
        192
        193
        194
        195
        196
        197
        198
        199
        200
        201
        202
        203
        204
        205
        206
        207
        208
        209
        210
        211
        212
        213
        214
        215
        216
        217
        218
        219
        220
        221
        222
        223
        224
        225
        226
        227
        228
        229
        230
        231
        232
        233
        234
        235
        236
        237
        238
        239
        240
        241
        242
        243
        244
        245
        246
        247
        248
        249
        250
        251
        252
        253
        254
        255
        256
        257
        258
        259
        260
        261
        262
        263
        264
        265
        266
        267
        268
        269
        270
        271
        272
        273
        274
        275
        276
        277
        278
        279
        280
        281
        282
        283
        284
        285
        286
        287
        288
        289
        290
        291
        292
        293
        294
        295
        296
        297
        298
        299
        300
        301
        302
        303
        304
        305
        306
        307
        308
        309
        310
        311
        312
        313
        314
        315
        316
        317
        318
        319
        320
        321
        322
        class ConnectionManager:
            """Manages connections for both Agents and Teams."""
        
            connection_processed = Signal(Talk.ConnectionProcessed)
        
            node_connected = Signal(object)  # Node
            connection_added = Signal(Talk)  # Agent
        
            def __init__(self, owner: MessageEmitter):
                self.owner = owner
                # helper class for the user
                self._connections = EventedList[Talk]()
                self._wait_states: dict[AgentName, bool] = {}
        
            def __repr__(self):
                return f"ConnectionManager({self.owner})"
        
            def _on_talk_added(self, index: int, talk: Talk):
                """Connect to new talk's signal."""
                talk.connection_processed.connect(self._handle_message_flow)
        
            def _on_talk_removed(self, index: int, talk: Talk):
                """Disconnect from removed talk's signal."""
                talk.connection_processed.disconnect(self._handle_message_flow)
        
            def _on_talk_changed(self, index: int, old: Talk, new: Talk):
                """Update signal connections on talk change."""
                old.connection_processed.disconnect(self._handle_message_flow)
                new.connection_processed.connect(self._handle_message_flow)
        
            def _handle_message_flow(self, event: Talk.ConnectionProcessed):
                """Forward message flow to our aggregated signal."""
                self.connection_processed.emit(event)
        
            def set_wait_state(
                self,
                target: MessageEmitter | AgentName,
                wait: bool = True,
            ):
                """Set waiting behavior for target."""
                target_name = target if isinstance(target, str) else target.name
                self._wait_states[target_name] = wait
        
            async def wait_for_connections(self, _seen: set[AgentName] | None = None):
                """Wait for this agent and all connected agents to complete their tasks."""
                seen: set[AgentName] = _seen or {self.owner.name}  # type: ignore
        
                # Wait for our own tasks
                await self.owner.complete_tasks()
        
                # Wait for connected agents
                for agent in self.get_targets():
                    if agent.name not in seen:
                        seen.add(agent.name)
                        await agent.connections.wait_for_connections(seen)
        
            def get_targets(
                self, recursive: bool = False, _seen: set[AgentName] | None = None
            ) -> set[MessageNode]:
                """Get all currently connected target agents.
        
                Args:
                    recursive: Whether to include targets of targets
                """
                # Get direct targets
                targets = {t for conn in self._connections for t in conn.targets if conn.active}
        
                if not recursive:
                    return targets
        
                # Track seen agents to prevent cycles
                seen = _seen or {self.owner.name}  # type: ignore
                all_targets = set()
        
                for target in targets:
                    if target.name not in seen:
                        _targets = target.connections.get_targets(recursive=True, _seen=seen)
                        seen.add(target.name)
                        all_targets.add(target)
                        all_targets.update(_targets)
        
                return all_targets
        
            def has_connection_to(self, target: MessageNode) -> bool:
                """Check if target is connected."""
                return any(target in conn.targets for conn in self._connections if conn.active)
        
            def create_connection(
                self,
                source: MessageEmitter,
                target: MessageNode | Sequence[MessageNode],
                *,
                connection_type: ConnectionType = "run",
                priority: int = 0,
                delay: timedelta | None = None,
                queued: bool = False,
                queue_strategy: QueueStrategy = "latest",
                transform: AnyTransformFn | None = None,
                filter_condition: AsyncFilterFn | None = None,
                stop_condition: AsyncFilterFn | None = None,
                exit_condition: AsyncFilterFn | None = None,
                name: str | None = None,
            ) -> Talk[Any] | TeamTalk:
                """Create connection(s) to target(s).
        
                Args:
                    source: Source agent or team
                    target: Single target or sequence of targets
                    connection_type: How to handle messages
                    priority: Task priority (lower = higher priority)
                    delay: Optional delay before processing
                    queued: Whether to queue messages for manual processing
                    queue_strategy: How to process queued messages
                    transform: Optional message transformation
                    filter_condition: When to filter messages
                    stop_condition: When to disconnect
                    exit_condition: When to exit application
                    name: Optional name for cross-referencing connections
                """
                if isinstance(target, Sequence):
                    # Create individual talks recursively
                    talks = [
                        self.create_connection(
                            source,
                            t,
                            connection_type=connection_type,
                            priority=priority,
                            delay=delay,
                            queued=queued,
                            queue_strategy=queue_strategy,
                            transform=transform,
                            filter_condition=filter_condition,
                            stop_condition=stop_condition,
                            exit_condition=exit_condition,
                            # Don't pass name - it should only apply to single connections
                        )
                        for t in target
                    ]
                    return TeamTalk(talks)
        
                # Single target case
                talk = Talk(
                    source=source,
                    targets=[target],
                    connection_type=connection_type,
                    name=name,
                    priority=priority,
                    delay=delay,
                    queued=queued,
                    queue_strategy=queue_strategy,
                    transform=transform,
                    filter_condition=filter_condition,
                    stop_condition=stop_condition,
                    exit_condition=exit_condition,
                )
                # TODO: better perhaps directly connect EventedList signal to node_connected?
                # or emit in _on_talk_added?
                self.node_connected.emit(target)
                self._connections.append(talk)
                self.connection_added.emit(talk)
                if source.context and (pool := source.context.pool):
                    # Always use Talk's name for registration
                    if name:
                        pool.connection_registry.register(name, talk)
                    else:
                        pool.connection_registry.register_auto(talk)
                else:
                    logger.debug("Could not register connection %r, no pool available", name)
                return talk
        
            async def trigger_all(self) -> dict[AgentName, list[ChatMessage[Any]]]:
                """Trigger all queued connections."""
                results = {}
                for talk in self._connections:
                    if isinstance(talk, Talk) and talk.queued:
                        results[talk.source.name] = await talk.trigger()
                return results
        
            async def trigger_for(
                self, target: AgentName | MessageNode[Any, Any]
            ) -> list[ChatMessage[Any]]:
                """Trigger queued connections to specific target."""
                target_name = target if isinstance(target, str) else target.name
                results = []
                for talk in self._connections:
                    if talk.queued and (t.name == target_name for t in talk.targets):
                        results.extend(await talk.trigger())
                return results
        
            def disconnect_all(self):
                """Disconnect all managed connections."""
                for conn in self._connections:
                    conn.disconnect()
                self._connections.clear()
        
            def disconnect(self, node: MessageNode):
                """Disconnect a specific node."""
                for talk in self._connections:
                    if node in talk.targets or node == talk.source:
                        talk.active = False
                        self._connections.remove(talk)
        
            async def route_message(self, message: ChatMessage[Any], wait: bool | None = None):
                """Route message to all connections."""
                if wait is not None:
                    should_wait = wait
                else:
                    should_wait = any(
                        self._wait_states.get(t.name, False) for t in self.get_targets()
                    )
                msg = "ConnectionManager routing message from %s to %d connections"
                logger.debug(msg, message.name, len(self._connections))
                for talk in self._connections:
                    await talk._handle_message(message, None)
        
                if should_wait:
                    await self.wait_for_connections()
        
            @asynccontextmanager
            async def paused_routing(self):
                """Temporarily pause message routing to connections."""
                active_talks = [talk for talk in self._connections if talk.active]
                for talk in active_talks:
                    talk.active = False
        
                try:
                    yield self
                finally:
                    for talk in active_talks:
                        talk.active = True
        
            @property
            def stats(self) -> AggregatedTalkStats:
                """Get aggregated statistics for all connections."""
                return AggregatedTalkStats(stats=[conn.stats for conn in self._connections])
        
            def get_connections(self, recursive: bool = False) -> list[Talk[Any]]:
                """Get all Talk connections, flattening TeamTalks."""
                result = []
                seen = set()
        
                # Get our direct connections
                for conn in self._connections:
                    result.append(conn)  # noqa: PERF402
                # Get target connections if recursive
                if recursive:
                    for conn in result:
                        for target in conn.targets:
                            if target.name not in seen:
                                seen.add(target.name)
                                result.extend(target.connections.get_connections(True))
        
                return result
        
            def get_mermaid_diagram(
                self,
                include_details: bool = True,
                recursive: bool = True,
            ) -> str:
                """Generate mermaid flowchart of all connections."""
                lines = ["flowchart LR"]
                connections = self.get_connections(recursive=recursive)
        
                for talk in connections:
                    source = talk.source.name
                    for target in talk.targets:
                        if not include_details:
                            lines.append(f"    {source}-->{target.name}")
                            continue
                        details: list[str] = []
                        details.append(talk.connection_type)
                        if talk.queued:
                            details.append(f"queued({talk.queue_strategy})")
                        if talk.filter_condition:
                            details.append(f"filter:{talk.filter_condition.__name__}")
                        if talk.stop_condition:
                            details.append(f"stop:{talk.stop_condition.__name__}")
                        if talk.exit_condition:
                            details.append(f"exit:{talk.exit_condition.__name__}")
                        elif any([
                            talk.filter_condition,
                            talk.stop_condition,
                            talk.exit_condition,
                        ]):
                            details.append("conditions")
        
                        label = f"|{' '.join(details)}|" if details else ""
                        lines.append(f"    {source}--{label}-->{target.name}")
        
                return "\n".join(lines)
        

        stats property

        Get aggregated statistics for all connections.

        _handle_message_flow

        _handle_message_flow(event: ConnectionProcessed)
        

        Forward message flow to our aggregated signal.

        Source code in src/llmling_agent/messaging/connection_manager.py
        63
        64
        65
        def _handle_message_flow(self, event: Talk.ConnectionProcessed):
            """Forward message flow to our aggregated signal."""
            self.connection_processed.emit(event)
        

        _on_talk_added

        _on_talk_added(index: int, talk: Talk)
        

        Connect to new talk's signal.

        Source code in src/llmling_agent/messaging/connection_manager.py
        50
        51
        52
        def _on_talk_added(self, index: int, talk: Talk):
            """Connect to new talk's signal."""
            talk.connection_processed.connect(self._handle_message_flow)
        

        _on_talk_changed

        _on_talk_changed(index: int, old: Talk, new: Talk)
        

        Update signal connections on talk change.

        Source code in src/llmling_agent/messaging/connection_manager.py
        58
        59
        60
        61
        def _on_talk_changed(self, index: int, old: Talk, new: Talk):
            """Update signal connections on talk change."""
            old.connection_processed.disconnect(self._handle_message_flow)
            new.connection_processed.connect(self._handle_message_flow)
        

        _on_talk_removed

        _on_talk_removed(index: int, talk: Talk)
        

        Disconnect from removed talk's signal.

        Source code in src/llmling_agent/messaging/connection_manager.py
        54
        55
        56
        def _on_talk_removed(self, index: int, talk: Talk):
            """Disconnect from removed talk's signal."""
            talk.connection_processed.disconnect(self._handle_message_flow)
        

        create_connection

        create_connection(
            source: MessageEmitter,
            target: MessageNode | Sequence[MessageNode],
            *,
            connection_type: ConnectionType = "run",
            priority: int = 0,
            delay: timedelta | None = None,
            queued: bool = False,
            queue_strategy: QueueStrategy = "latest",
            transform: AnyTransformFn | None = None,
            filter_condition: AsyncFilterFn | None = None,
            stop_condition: AsyncFilterFn | None = None,
            exit_condition: AsyncFilterFn | None = None,
            name: str | None = None,
        ) -> Talk[Any] | TeamTalk
        

        Create connection(s) to target(s).

        Parameters:

        Name Type Description Default
        source MessageEmitter

        Source agent or team

        required
        target MessageNode | Sequence[MessageNode]

        Single target or sequence of targets

        required
        connection_type ConnectionType

        How to handle messages

        'run'
        priority int

        Task priority (lower = higher priority)

        0
        delay timedelta | None

        Optional delay before processing

        None
        queued bool

        Whether to queue messages for manual processing

        False
        queue_strategy QueueStrategy

        How to process queued messages

        'latest'
        transform AnyTransformFn | None

        Optional message transformation

        None
        filter_condition AsyncFilterFn | None

        When to filter messages

        None
        stop_condition AsyncFilterFn | None

        When to disconnect

        None
        exit_condition AsyncFilterFn | None

        When to exit application

        None
        name str | None

        Optional name for cross-referencing connections

        None
        Source code in src/llmling_agent/messaging/connection_manager.py
        120
        121
        122
        123
        124
        125
        126
        127
        128
        129
        130
        131
        132
        133
        134
        135
        136
        137
        138
        139
        140
        141
        142
        143
        144
        145
        146
        147
        148
        149
        150
        151
        152
        153
        154
        155
        156
        157
        158
        159
        160
        161
        162
        163
        164
        165
        166
        167
        168
        169
        170
        171
        172
        173
        174
        175
        176
        177
        178
        179
        180
        181
        182
        183
        184
        185
        186
        187
        188
        189
        190
        191
        192
        193
        194
        195
        196
        197
        198
        199
        200
        201
        def create_connection(
            self,
            source: MessageEmitter,
            target: MessageNode | Sequence[MessageNode],
            *,
            connection_type: ConnectionType = "run",
            priority: int = 0,
            delay: timedelta | None = None,
            queued: bool = False,
            queue_strategy: QueueStrategy = "latest",
            transform: AnyTransformFn | None = None,
            filter_condition: AsyncFilterFn | None = None,
            stop_condition: AsyncFilterFn | None = None,
            exit_condition: AsyncFilterFn | None = None,
            name: str | None = None,
        ) -> Talk[Any] | TeamTalk:
            """Create connection(s) to target(s).
        
            Args:
                source: Source agent or team
                target: Single target or sequence of targets
                connection_type: How to handle messages
                priority: Task priority (lower = higher priority)
                delay: Optional delay before processing
                queued: Whether to queue messages for manual processing
                queue_strategy: How to process queued messages
                transform: Optional message transformation
                filter_condition: When to filter messages
                stop_condition: When to disconnect
                exit_condition: When to exit application
                name: Optional name for cross-referencing connections
            """
            if isinstance(target, Sequence):
                # Create individual talks recursively
                talks = [
                    self.create_connection(
                        source,
                        t,
                        connection_type=connection_type,
                        priority=priority,
                        delay=delay,
                        queued=queued,
                        queue_strategy=queue_strategy,
                        transform=transform,
                        filter_condition=filter_condition,
                        stop_condition=stop_condition,
                        exit_condition=exit_condition,
                        # Don't pass name - it should only apply to single connections
                    )
                    for t in target
                ]
                return TeamTalk(talks)
        
            # Single target case
            talk = Talk(
                source=source,
                targets=[target],
                connection_type=connection_type,
                name=name,
                priority=priority,
                delay=delay,
                queued=queued,
                queue_strategy=queue_strategy,
                transform=transform,
                filter_condition=filter_condition,
                stop_condition=stop_condition,
                exit_condition=exit_condition,
            )
            # TODO: better perhaps directly connect EventedList signal to node_connected?
            # or emit in _on_talk_added?
            self.node_connected.emit(target)
            self._connections.append(talk)
            self.connection_added.emit(talk)
            if source.context and (pool := source.context.pool):
                # Always use Talk's name for registration
                if name:
                    pool.connection_registry.register(name, talk)
                else:
                    pool.connection_registry.register_auto(talk)
            else:
                logger.debug("Could not register connection %r, no pool available", name)
            return talk
        

        disconnect

        disconnect(node: MessageNode)
        

        Disconnect a specific node.

        Source code in src/llmling_agent/messaging/connection_manager.py
        228
        229
        230
        231
        232
        233
        def disconnect(self, node: MessageNode):
            """Disconnect a specific node."""
            for talk in self._connections:
                if node in talk.targets or node == talk.source:
                    talk.active = False
                    self._connections.remove(talk)
        

        disconnect_all

        disconnect_all()
        

        Disconnect all managed connections.

        Source code in src/llmling_agent/messaging/connection_manager.py
        222
        223
        224
        225
        226
        def disconnect_all(self):
            """Disconnect all managed connections."""
            for conn in self._connections:
                conn.disconnect()
            self._connections.clear()
        

        get_connections

        get_connections(recursive: bool = False) -> list[Talk[Any]]
        

        Get all Talk connections, flattening TeamTalks.

        Source code in src/llmling_agent/messaging/connection_manager.py
        269
        270
        271
        272
        273
        274
        275
        276
        277
        278
        279
        280
        281
        282
        283
        284
        285
        def get_connections(self, recursive: bool = False) -> list[Talk[Any]]:
            """Get all Talk connections, flattening TeamTalks."""
            result = []
            seen = set()
        
            # Get our direct connections
            for conn in self._connections:
                result.append(conn)  # noqa: PERF402
            # Get target connections if recursive
            if recursive:
                for conn in result:
                    for target in conn.targets:
                        if target.name not in seen:
                            seen.add(target.name)
                            result.extend(target.connections.get_connections(True))
        
            return result
        

        get_mermaid_diagram

        get_mermaid_diagram(include_details: bool = True, recursive: bool = True) -> str
        

        Generate mermaid flowchart of all connections.

        Source code in src/llmling_agent/messaging/connection_manager.py
        287
        288
        289
        290
        291
        292
        293
        294
        295
        296
        297
        298
        299
        300
        301
        302
        303
        304
        305
        306
        307
        308
        309
        310
        311
        312
        313
        314
        315
        316
        317
        318
        319
        320
        321
        322
        def get_mermaid_diagram(
            self,
            include_details: bool = True,
            recursive: bool = True,
        ) -> str:
            """Generate mermaid flowchart of all connections."""
            lines = ["flowchart LR"]
            connections = self.get_connections(recursive=recursive)
        
            for talk in connections:
                source = talk.source.name
                for target in talk.targets:
                    if not include_details:
                        lines.append(f"    {source}-->{target.name}")
                        continue
                    details: list[str] = []
                    details.append(talk.connection_type)
                    if talk.queued:
                        details.append(f"queued({talk.queue_strategy})")
                    if talk.filter_condition:
                        details.append(f"filter:{talk.filter_condition.__name__}")
                    if talk.stop_condition:
                        details.append(f"stop:{talk.stop_condition.__name__}")
                    if talk.exit_condition:
                        details.append(f"exit:{talk.exit_condition.__name__}")
                    elif any([
                        talk.filter_condition,
                        talk.stop_condition,
                        talk.exit_condition,
                    ]):
                        details.append("conditions")
        
                    label = f"|{' '.join(details)}|" if details else ""
                    lines.append(f"    {source}--{label}-->{target.name}")
        
            return "\n".join(lines)
        

        get_targets

        get_targets(
            recursive: bool = False, _seen: set[AgentName] | None = None
        ) -> set[MessageNode]
        

        Get all currently connected target agents.

        Parameters:

        Name Type Description Default
        recursive bool

        Whether to include targets of targets

        False
        Source code in src/llmling_agent/messaging/connection_manager.py
         89
         90
         91
         92
         93
         94
         95
         96
         97
         98
         99
        100
        101
        102
        103
        104
        105
        106
        107
        108
        109
        110
        111
        112
        113
        114
        def get_targets(
            self, recursive: bool = False, _seen: set[AgentName] | None = None
        ) -> set[MessageNode]:
            """Get all currently connected target agents.
        
            Args:
                recursive: Whether to include targets of targets
            """
            # Get direct targets
            targets = {t for conn in self._connections for t in conn.targets if conn.active}
        
            if not recursive:
                return targets
        
            # Track seen agents to prevent cycles
            seen = _seen or {self.owner.name}  # type: ignore
            all_targets = set()
        
            for target in targets:
                if target.name not in seen:
                    _targets = target.connections.get_targets(recursive=True, _seen=seen)
                    seen.add(target.name)
                    all_targets.add(target)
                    all_targets.update(_targets)
        
            return all_targets
        

        has_connection_to

        has_connection_to(target: MessageNode) -> bool
        

        Check if target is connected.

        Source code in src/llmling_agent/messaging/connection_manager.py
        116
        117
        118
        def has_connection_to(self, target: MessageNode) -> bool:
            """Check if target is connected."""
            return any(target in conn.targets for conn in self._connections if conn.active)
        

        paused_routing async

        paused_routing()
        

        Temporarily pause message routing to connections.

        Source code in src/llmling_agent/messaging/connection_manager.py
        251
        252
        253
        254
        255
        256
        257
        258
        259
        260
        261
        262
        @asynccontextmanager
        async def paused_routing(self):
            """Temporarily pause message routing to connections."""
            active_talks = [talk for talk in self._connections if talk.active]
            for talk in active_talks:
                talk.active = False
        
            try:
                yield self
            finally:
                for talk in active_talks:
                    talk.active = True
        

        route_message async

        route_message(message: ChatMessage[Any], wait: bool | None = None)
        

        Route message to all connections.

        Source code in src/llmling_agent/messaging/connection_manager.py
        235
        236
        237
        238
        239
        240
        241
        242
        243
        244
        245
        246
        247
        248
        249
        async def route_message(self, message: ChatMessage[Any], wait: bool | None = None):
            """Route message to all connections."""
            if wait is not None:
                should_wait = wait
            else:
                should_wait = any(
                    self._wait_states.get(t.name, False) for t in self.get_targets()
                )
            msg = "ConnectionManager routing message from %s to %d connections"
            logger.debug(msg, message.name, len(self._connections))
            for talk in self._connections:
                await talk._handle_message(message, None)
        
            if should_wait:
                await self.wait_for_connections()
        

        set_wait_state

        set_wait_state(target: MessageEmitter | AgentName, wait: bool = True)
        

        Set waiting behavior for target.

        Source code in src/llmling_agent/messaging/connection_manager.py
        67
        68
        69
        70
        71
        72
        73
        74
        def set_wait_state(
            self,
            target: MessageEmitter | AgentName,
            wait: bool = True,
        ):
            """Set waiting behavior for target."""
            target_name = target if isinstance(target, str) else target.name
            self._wait_states[target_name] = wait
        

        trigger_all async

        trigger_all() -> dict[AgentName, list[ChatMessage[Any]]]
        

        Trigger all queued connections.

        Source code in src/llmling_agent/messaging/connection_manager.py
        203
        204
        205
        206
        207
        208
        209
        async def trigger_all(self) -> dict[AgentName, list[ChatMessage[Any]]]:
            """Trigger all queued connections."""
            results = {}
            for talk in self._connections:
                if isinstance(talk, Talk) and talk.queued:
                    results[talk.source.name] = await talk.trigger()
            return results
        

        trigger_for async

        trigger_for(target: AgentName | MessageNode[Any, Any]) -> list[ChatMessage[Any]]
        

        Trigger queued connections to specific target.

        Source code in src/llmling_agent/messaging/connection_manager.py
        211
        212
        213
        214
        215
        216
        217
        218
        219
        220
        async def trigger_for(
            self, target: AgentName | MessageNode[Any, Any]
        ) -> list[ChatMessage[Any]]:
            """Trigger queued connections to specific target."""
            target_name = target if isinstance(target, str) else target.name
            results = []
            for talk in self._connections:
                if talk.queued and (t.name == target_name for t in talk.targets):
                    results.extend(await talk.trigger())
            return results
        

        wait_for_connections async

        wait_for_connections(_seen: set[AgentName] | None = None)
        

        Wait for this agent and all connected agents to complete their tasks.

        Source code in src/llmling_agent/messaging/connection_manager.py
        76
        77
        78
        79
        80
        81
        82
        83
        84
        85
        86
        87
        async def wait_for_connections(self, _seen: set[AgentName] | None = None):
            """Wait for this agent and all connected agents to complete their tasks."""
            seen: set[AgentName] = _seen or {self.owner.name}  # type: ignore
        
            # Wait for our own tasks
            await self.owner.complete_tasks()
        
            # Wait for connected agents
            for agent in self.get_targets():
                if agent.name not in seen:
                    seen.add(agent.name)
                    await agent.connections.wait_for_connections(seen)