Skip to content

talk

Class info

Classes

Name Children Inherits
AggregatedTalkStats
llmling_agent.talk.stats
Statistics aggregated from multiple connections.
    ConnectionRegistry
    llmling_agent.talk.registry
    Registry for managing named connections.
      Talk
      llmling_agent.talk.talk
      Manages message flow between agents/groups.
        TalkStats
        llmling_agent.talk.stats
        Statistics for a single connection.
          TeamTalk
          llmling_agent.talk.talk
          Group of connections with aggregate operations.

          🛈 DocStrings

          Talk classes.

          AggregatedTalkStats dataclass

          Bases: AggregatedMessageStats

          Statistics aggregated from multiple connections.

          Source code in src/llmling_agent/talk/stats.py
          129
          130
          131
          132
          133
          134
          135
          136
          137
          138
          139
          140
          141
          142
          143
          144
          145
          146
          147
          148
          149
          150
          151
          152
          153
          154
          155
          156
          157
          @dataclass(kw_only=True)
          class AggregatedTalkStats(AggregatedMessageStats):
              """Statistics aggregated from multiple connections."""
          
              stats: Sequence[TalkStats | AggregatedTalkStats] = field(default_factory=list)
          
              @cached_property
              def source_names(self) -> set[str]:
                  """Set of unique source names recursively."""
          
                  def _collect_source_names(stat: TalkStats | AggregatedTalkStats) -> set[str]:
                      """Recursively collect source names."""
                      if isinstance(stat, TalkStats):
                          return {stat.source_name} if stat.source_name else set()
                      # It's a AggregatedTalkStats, recurse
                      names = set()
                      for s in stat.stats:
                          names.update(_collect_source_names(s))
                      return names
          
                  names = set()
                  for stat in self.stats:
                      names.update(_collect_source_names(stat))
                  return names
          
              @cached_property
              def target_names(self) -> set[str]:
                  """Set of all target names across connections."""
                  return {name for s in self.stats for name in s.target_names}
          

          source_names cached property

          source_names: set[str]
          

          Set of unique source names recursively.

          target_names cached property

          target_names: set[str]
          

          Set of all target names across connections.

          ConnectionRegistry

          Bases: BaseRegistry[str, Talk]

          Registry for managing named connections.

          Allows looking up Talk instances by their name. Only named connections get registered.

          Source code in src/llmling_agent/talk/registry.py
           60
           61
           62
           63
           64
           65
           66
           67
           68
           69
           70
           71
           72
           73
           74
           75
           76
           77
           78
           79
           80
           81
           82
           83
           84
           85
           86
           87
           88
           89
           90
           91
           92
           93
           94
           95
           96
           97
           98
           99
          100
          101
          102
          103
          104
          105
          106
          107
          108
          109
          110
          111
          112
          113
          114
          115
          116
          117
          118
          119
          120
          121
          122
          123
          124
          125
          126
          127
          128
          class ConnectionRegistry(BaseRegistry[str, Talk]):
              """Registry for managing named connections.
          
              Allows looking up Talk instances by their name. Only named
              connections get registered.
              """
          
              message_flow = Signal(Talk.ConnectionProcessed)
          
              def __init__(self, *args, **kwargs):
                  """Initialize registry and connect event handlers."""
                  super().__init__(*args, **kwargs)
                  # Connect handlers to EventedDict events
                  self._items.events.added.connect(self._on_talk_added)
                  self._items.events.removed.connect(self._on_talk_removed)
                  self._items.events.changed.connect(self._on_talk_changed)
          
              def _on_talk_added(self, name: str, talk: Talk):
                  """Handle new talk being added to registry."""
                  talk.connection_processed.connect(self._handle_message_flow)
                  logger.debug("Connected signal for talk: %s", name)
          
              def _on_talk_removed(self, name: str, talk: Talk):
                  """Handle talk being removed from registry."""
                  talk.connection_processed.disconnect(self._handle_message_flow)
                  logger.debug("Disconnected signal for talk: %s", name)
          
              def _on_talk_changed(self, name: str, old_talk: Talk, new_talk: Talk):
                  """Handle talk being replaced in registry."""
                  old_talk.connection_processed.disconnect(self._handle_message_flow)
                  new_talk.connection_processed.connect(self._handle_message_flow)
                  logger.debug("Reconnected signal for talk: %s", name)
          
              def _handle_message_flow(self, event: Talk.ConnectionProcessed):
                  """Forward message flow to global stream."""
                  self.message_flow.emit(event)
          
              @property
              def _error_class(self) -> type[ConnectionRegistryError]:
                  return ConnectionRegistryError
          
              def _validate_item(self, item: Any) -> Talk:
                  """Ensure only Talk instances can be registered."""
                  if not isinstance(item, Talk):
                      msg = f"Expected Talk instance, got {type(item)}"
                      raise self._error_class(msg)
          
                  return item
          
              def register_auto(self, talk: Talk[Any], base_name: str | None = None) -> str:
                  """Register talk with auto-generated unique name.
          
                  Args:
                      talk: Talk instance to register
                      base_name: Optional base name to use (defaults to talk.name)
          
                  Returns:
                      The actual name used for registration
                  """
                  base = base_name or talk.name
                  counter = 1
                  name = base
          
                  while name in self:
                      name = f"{base}_{counter}"
                      counter += 1
                  talk.name = name
                  self.register(name, talk)
                  return name
          

          __init__

          __init__(*args, **kwargs)
          

          Initialize registry and connect event handlers.

          Source code in src/llmling_agent/talk/registry.py
          69
          70
          71
          72
          73
          74
          75
          def __init__(self, *args, **kwargs):
              """Initialize registry and connect event handlers."""
              super().__init__(*args, **kwargs)
              # Connect handlers to EventedDict events
              self._items.events.added.connect(self._on_talk_added)
              self._items.events.removed.connect(self._on_talk_removed)
              self._items.events.changed.connect(self._on_talk_changed)
          

          _handle_message_flow

          _handle_message_flow(event: ConnectionProcessed)
          

          Forward message flow to global stream.

          Source code in src/llmling_agent/talk/registry.py
          93
          94
          95
          def _handle_message_flow(self, event: Talk.ConnectionProcessed):
              """Forward message flow to global stream."""
              self.message_flow.emit(event)
          

          _on_talk_added

          _on_talk_added(name: str, talk: Talk)
          

          Handle new talk being added to registry.

          Source code in src/llmling_agent/talk/registry.py
          77
          78
          79
          80
          def _on_talk_added(self, name: str, talk: Talk):
              """Handle new talk being added to registry."""
              talk.connection_processed.connect(self._handle_message_flow)
              logger.debug("Connected signal for talk: %s", name)
          

          _on_talk_changed

          _on_talk_changed(name: str, old_talk: Talk, new_talk: Talk)
          

          Handle talk being replaced in registry.

          Source code in src/llmling_agent/talk/registry.py
          87
          88
          89
          90
          91
          def _on_talk_changed(self, name: str, old_talk: Talk, new_talk: Talk):
              """Handle talk being replaced in registry."""
              old_talk.connection_processed.disconnect(self._handle_message_flow)
              new_talk.connection_processed.connect(self._handle_message_flow)
              logger.debug("Reconnected signal for talk: %s", name)
          

          _on_talk_removed

          _on_talk_removed(name: str, talk: Talk)
          

          Handle talk being removed from registry.

          Source code in src/llmling_agent/talk/registry.py
          82
          83
          84
          85
          def _on_talk_removed(self, name: str, talk: Talk):
              """Handle talk being removed from registry."""
              talk.connection_processed.disconnect(self._handle_message_flow)
              logger.debug("Disconnected signal for talk: %s", name)
          

          _validate_item

          _validate_item(item: Any) -> Talk
          

          Ensure only Talk instances can be registered.

          Source code in src/llmling_agent/talk/registry.py
          101
          102
          103
          104
          105
          106
          107
          def _validate_item(self, item: Any) -> Talk:
              """Ensure only Talk instances can be registered."""
              if not isinstance(item, Talk):
                  msg = f"Expected Talk instance, got {type(item)}"
                  raise self._error_class(msg)
          
              return item
          

          register_auto

          register_auto(talk: Talk[Any], base_name: str | None = None) -> str
          

          Register talk with auto-generated unique name.

          Parameters:

          Name Type Description Default
          talk Talk[Any]

          Talk instance to register

          required
          base_name str | None

          Optional base name to use (defaults to talk.name)

          None

          Returns:

          Type Description
          str

          The actual name used for registration

          Source code in src/llmling_agent/talk/registry.py
          109
          110
          111
          112
          113
          114
          115
          116
          117
          118
          119
          120
          121
          122
          123
          124
          125
          126
          127
          128
          def register_auto(self, talk: Talk[Any], base_name: str | None = None) -> str:
              """Register talk with auto-generated unique name.
          
              Args:
                  talk: Talk instance to register
                  base_name: Optional base name to use (defaults to talk.name)
          
              Returns:
                  The actual name used for registration
              """
              base = base_name or talk.name
              counter = 1
              name = base
          
              while name in self:
                  name = f"{base}_{counter}"
                  counter += 1
              talk.name = name
              self.register(name, talk)
              return name
          

          Talk

          Manages message flow between agents/groups.

          Source code in src/llmling_agent/talk/talk.py
           39
           40
           41
           42
           43
           44
           45
           46
           47
           48
           49
           50
           51
           52
           53
           54
           55
           56
           57
           58
           59
           60
           61
           62
           63
           64
           65
           66
           67
           68
           69
           70
           71
           72
           73
           74
           75
           76
           77
           78
           79
           80
           81
           82
           83
           84
           85
           86
           87
           88
           89
           90
           91
           92
           93
           94
           95
           96
           97
           98
           99
          100
          101
          102
          103
          104
          105
          106
          107
          108
          109
          110
          111
          112
          113
          114
          115
          116
          117
          118
          119
          120
          121
          122
          123
          124
          125
          126
          127
          128
          129
          130
          131
          132
          133
          134
          135
          136
          137
          138
          139
          140
          141
          142
          143
          144
          145
          146
          147
          148
          149
          150
          151
          152
          153
          154
          155
          156
          157
          158
          159
          160
          161
          162
          163
          164
          165
          166
          167
          168
          169
          170
          171
          172
          173
          174
          175
          176
          177
          178
          179
          180
          181
          182
          183
          184
          185
          186
          187
          188
          189
          190
          191
          192
          193
          194
          195
          196
          197
          198
          199
          200
          201
          202
          203
          204
          205
          206
          207
          208
          209
          210
          211
          212
          213
          214
          215
          216
          217
          218
          219
          220
          221
          222
          223
          224
          225
          226
          227
          228
          229
          230
          231
          232
          233
          234
          235
          236
          237
          238
          239
          240
          241
          242
          243
          244
          245
          246
          247
          248
          249
          250
          251
          252
          253
          254
          255
          256
          257
          258
          259
          260
          261
          262
          263
          264
          265
          266
          267
          268
          269
          270
          271
          272
          273
          274
          275
          276
          277
          278
          279
          280
          281
          282
          283
          284
          285
          286
          287
          288
          289
          290
          291
          292
          293
          294
          295
          296
          297
          298
          299
          300
          301
          302
          303
          304
          305
          306
          307
          308
          309
          310
          311
          312
          313
          314
          315
          316
          317
          318
          319
          320
          321
          322
          323
          324
          325
          326
          327
          328
          329
          330
          331
          332
          333
          334
          335
          336
          337
          338
          339
          340
          341
          342
          343
          344
          345
          346
          347
          348
          349
          350
          351
          352
          353
          354
          355
          356
          357
          358
          359
          360
          361
          362
          363
          364
          365
          366
          367
          368
          369
          370
          371
          372
          373
          374
          375
          376
          377
          378
          379
          380
          381
          382
          383
          384
          385
          386
          387
          388
          389
          390
          391
          392
          393
          394
          395
          396
          397
          398
          399
          400
          401
          402
          403
          404
          405
          406
          407
          408
          409
          410
          411
          412
          413
          414
          415
          416
          417
          418
          419
          420
          421
          422
          423
          424
          425
          426
          427
          428
          429
          430
          431
          432
          433
          434
          435
          436
          437
          438
          439
          440
          441
          442
          443
          444
          445
          446
          447
          448
          449
          450
          451
          452
          453
          454
          455
          456
          457
          458
          459
          460
          461
          462
          463
          464
          465
          466
          467
          468
          469
          470
          471
          472
          473
          474
          475
          476
          477
          478
          479
          480
          481
          482
          483
          484
          485
          486
          487
          488
          489
          490
          491
          492
          493
          494
          495
          496
          497
          498
          499
          class Talk[TTransmittedData]:
              """Manages message flow between agents/groups."""
          
              @dataclass(frozen=True)
              class ConnectionProcessed:
                  """Event emitted when a message flows through a connection."""
          
                  message: ChatMessage
                  source: MessageEmitter
                  targets: list[MessageNode]
                  queued: bool
                  connection_type: ConnectionType
                  timestamp: datetime = field(default_factory=datetime.now)
          
              # Original message "coming in"
              message_received = Signal(ChatMessage)
              # After any transformation (one for each message, not per target)
              message_forwarded = Signal(ChatMessage)
              # Comprehensive signal capturing all information about one "message handling process"
              connection_processed = Signal(ConnectionProcessed)
          
              def __init__(
                  self,
                  source: MessageEmitter,
                  targets: Sequence[MessageNode],
                  group: TeamTalk | None = None,
                  *,
                  name: str | None = None,
                  connection_type: ConnectionType = "run",
                  wait_for_connections: bool = False,
                  priority: int = 0,
                  delay: timedelta | None = None,
                  queued: bool = False,
                  queue_strategy: QueueStrategy = "latest",
                  transform: AnyTransformFn[ChatMessage[TTransmittedData]] | None = None,
                  filter_condition: AnyFilterFn | None = None,
                  stop_condition: AnyFilterFn | None = None,
                  exit_condition: AnyFilterFn | None = None,
              ):
                  """Initialize talk connection.
          
                  Args:
                      source: Agent sending messages
                      targets: Agents receiving messages
                      group: Optional group this talk belongs to
                      name: Optional name for this talk
                      connection_type: How to handle messages:
                          - "run": Execute message as a new run in target
                          - "context": Add message as context to target
                          - "forward": Forward message to target's outbox
                      wait_for_connections: Whether to wait for all targets to complete
                      priority: Task priority (lower = higher priority)
                      delay: Optional delay before processing
                      queued: Whether messages should be queued for manual processing
                      queue_strategy: How to process queued messages:
                          - "concat": Combine all messages with newlines
                          - "latest": Use only the most recent message
                          - "buffer": Process all messages individually
                      transform: Optional function to transform messages
                      filter_condition: Optional condition for filtering messages
                      stop_condition: Optional condition for disconnecting
                      exit_condition: Optional condition for stopping the event loop
                  """
                  self.source = source
                  self.targets = list(targets)
                  # Could perhaps better be an auto-inferring property
                  self.name = name or f"{source.name}->{[t.name for t in targets]}"
                  self.group = group
                  self.priority = priority
                  self.delay = delay
                  self.active = True
                  self.connection_type = connection_type
                  self.wait_for_connections = wait_for_connections
                  self.queued = queued
                  self.queue_strategy = queue_strategy
                  self._pending_messages = defaultdict[str, list[ChatMessage[TTransmittedData]]](
                      list
                  )
                  names = {t.name for t in targets}
                  self._stats = TalkStats(source_name=source.name, target_names=names)
                  self.transform_fn = transform
                  self.filter_condition = filter_condition
                  self.stop_condition = stop_condition
                  self.exit_condition = exit_condition
          
              def __repr__(self):
                  targets = [t.name for t in self.targets]
                  return f"<Talk({self.connection_type}) {self.source.name} -> {targets}>"
          
              @overload
              def __rshift__(
                  self,
                  other: MessageNode[Any, str]
                  | ProcessorCallback[str]
                  | Sequence[MessageNode[Any, str] | ProcessorCallback[str]],
              ) -> TeamTalk[str]: ...
          
              @overload
              def __rshift__(
                  self,
                  other: MessageNode[Any, Any]
                  | ProcessorCallback[Any]
                  | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
              ) -> TeamTalk[Any]: ...
          
              def __rshift__(
                  self,
                  other: MessageNode[Any, Any]
                  | ProcessorCallback[Any]
                  | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
              ) -> TeamTalk[Any]:
                  """Add another node as target to the connection or group.
          
                  Example:
                      connection >> other_agent  # Connect to single agent
                      connection >> (agent2 & agent3)  # Connect to group
                  """
                  from llmling_agent.agent import Agent, StructuredAgent
                  from llmling_agent.messaging.messagenode import MessageNode
                  from llmling_agent.talk import TeamTalk
                  from llmling_agent.utils.inspection import has_return_type
          
                  match other:
                      case Callable():
                          if has_return_type(other, str):
                              other = Agent.from_callback(other)
                          else:
                              other = StructuredAgent.from_callback(other)
                          if pool := self.source.context.pool:
                              pool.register(other.name, other)
                          return self.__rshift__(other)
                      case Sequence():
                          team_talks = [self.__rshift__(o) for o in other]
                          return TeamTalk([self, *team_talks])
                      case MessageNode():
                          talks = [t.__rshift__(other) for t in self.targets]
                          return TeamTalk([self, *talks])
                      case _:
                          msg = f"Invalid agent type: {type(other)}"
                          raise TypeError(msg)
          
              async def _evaluate_condition(
                  self,
                  condition: Callable[..., bool | Awaitable[bool]] | None,
                  message: ChatMessage[Any],
                  target: MessageNode,
                  *,
                  default_return: bool = False,
              ) -> bool:
                  """Evaluate a condition with flexible parameter handling."""
                  from llmling_agent.talk.registry import EventContext
          
                  if not condition:
                      return default_return
                  registry = (
                      context.pool.connection_registry
                      if (context := self.source.context) and context.pool
                      else None
                  )
                  ctx = EventContext(
                      message=message,
                      target=target,
                      stats=self.stats,
                      registry=registry,
                      talk=self,
                  )
                  return await execute(condition, ctx)
          
              def on_event(
                  self,
                  event_type: ConnectionEventType,
                  callback: Callable[
                      [ConnectionEventData[TTransmittedData]], None | Awaitable[None]
                  ],
              ) -> Self:
                  """Register callback for connection events."""
                  from llmling_agent.messaging.events import ConnectionEventData
          
                  async def wrapped_callback(event: EventData):
                      if isinstance(event, ConnectionEventData) and event.event_type == event_type:
                          await execute(callback, event)
          
                  self.source._events.add_callback(wrapped_callback)
                  return self
          
              async def _emit_connection_event(
                  self,
                  event_type: ConnectionEventType,
                  message: ChatMessage[TTransmittedData] | None,
              ):
                  from llmling_agent.messaging.events import ConnectionEventData
          
                  event = ConnectionEventData[Any](
                      connection=self,
                      source="connection",
                      connection_name=self.name,
                      event_type=event_type,
                      message=message,
                      timestamp=datetime.now(),
                  )
                  # Propagate to all event managers through registry
                  if self.source.context and (pool := self.source.context.pool):
                      for connection in pool.connection_registry.values():
                          await connection.source._events.emit_event(event)
          
              async def _handle_message(
                  self,
                  message: ChatMessage[TTransmittedData],
                  prompt: str | None = None,
              ) -> list[ChatMessage[Any]]:
                  """Handle message forwarding based on connection configuration."""
                  # 2. Early exit checks
                  if not (self.active and (not self.group or self.group.active)):
                      return []
          
                  # 3. Check exit condition for any target
                  for target in self.targets:
                      # Exit if condition returns True
                      if await self._evaluate_condition(self.exit_condition, message, target):
                          raise SystemExit
          
                  # 4. Check stop condition for any target
                  for target in self.targets:
                      # Stop if condition returns True
                      if await self._evaluate_condition(self.stop_condition, message, target):
                          self.disconnect()
                          return []
          
                  # 5. Transform if configured
                  processed_message = message
                  if self.transform_fn:
                      processed_message = await execute(self.transform_fn, message)
                  # 6. First pass: Determine target list
                  target_list: list[MessageNode] = [
                      target
                      for target in self.targets
                      if await self._evaluate_condition(
                          self.filter_condition,
                          processed_message,
                          target,
                          default_return=True,
                      )
                  ]
                  # 7. emit connection processed event
                  self.connection_processed.emit(
                      self.ConnectionProcessed(
                          message=processed_message,
                          source=self.source,
                          targets=target_list,
                          queued=self.queued,
                          connection_type=self.connection_type,  # pyright: ignore
                      )
                  )
                  # 8. if we have targets, update stats and emit message forwarded
                  if target_list:
                      messages = [*self._stats.messages, processed_message]
                      self._stats = replace(self._stats, messages=messages)
                      self.message_forwarded.emit(processed_message)
          
                  # 9. Second pass: Actually process for each target
                  responses: list[ChatMessage[Any]] = []
                  for target in target_list:
                      if self.queued:
                          self._pending_messages[target.name].append(processed_message)
                          continue
                      if response := await self._process_for_target(
                          processed_message, target, prompt
                      ):
                          responses.append(response)
          
                  return responses
          
              async def _process_for_target(
                  self,
                  message: ChatMessage[Any],
                  target: MessageNode,
                  prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None = None,
              ) -> ChatMessage[Any] | None:
                  """Process message for a single target."""
                  from llmling_agent.agent import Agent, StructuredAgent
                  from llmling_agent.delegation.base_team import BaseTeam
          
                  match self.connection_type:
                      case "run":
                          prompts: list[AnyPromptType | PIL.Image.Image | os.PathLike[str]] = [
                              message
                          ]
                          if prompt:
                              prompts.append(prompt)
                          return await target.run(*prompts)
          
                      case "context":
                          meta = {
                              "type": "forwarded_message",
                              "role": message.role,
                              "model": message.model,
                              "cost_info": message.cost_info,
                              "timestamp": message.timestamp.isoformat(),
                              "prompt": prompt,
                          }
          
                          async def add_context():
                              match target:
                                  case BaseTeam():
                                      # Use distribute for teams
                                      await target.distribute(str(message.content), metadata=meta)
                                  case Agent() | StructuredAgent():  # Agent case
                                      # Use existing context message approach
                                      target.conversation.add_context_message(
                                          str(message.content),
                                          source=message.name,
                                          metadata=meta,
                                      )
          
                          if self.delay is not None or self.priority != 0:
                              coro = add_context()
                              target.run_background(coro, priority=self.priority, delay=self.delay)
                          else:
                              await add_context()
                          return None
          
                      case "forward":
                          if self.delay is not None or self.priority != 0:
          
                              async def delayed_emit():
                                  target.outbox.emit(message, prompt)
          
                              coro = delayed_emit()
                              target.run_background(coro, priority=self.priority, delay=self.delay)
                          else:
                              target.outbox.emit(message, prompt)
                          return None
          
              async def trigger(
                  self, prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None = None
              ) -> list[ChatMessage[TTransmittedData]]:
                  """Process queued messages."""
                  if not self._pending_messages:
                      return []
                  match self.queue_strategy:
                      case "buffer":
                          results: list[ChatMessage[TTransmittedData]] = []
                          # Process each agent's queue
                          for target in self.targets:
                              queue = self._pending_messages[target.name]
                              for msg in queue:
                                  if resp := await self._process_for_target(msg, target, prompt):
                                      results.append(resp)  # noqa: PERF401
                              queue.clear()
                          return results
          
                      case "latest":
                          results = []
                          # Get latest message for each agent
                          for target in self.targets:
                              queue = self._pending_messages[target.name]
                              if queue:
                                  latest = queue[-1]
                                  if resp := await self._process_for_target(latest, target, prompt):
                                      results.append(resp)
                                  queue.clear()
                          return results
          
                      case "concat":
                          results = []
                          # Concat messages per agent
                          for target in self.targets:
                              queue = self._pending_messages[target.name]
                              if not queue:
                                  continue
          
                              base = queue[-1]
                              contents = [str(m.content) for m in queue]
                              meta = {
                                  **base.metadata,
                                  "merged_count": len(queue),
                                  "queue_strategy": self.queue_strategy,
                              }
                              content = "\n\n".join(contents)
                              merged = replace(base, content=content, metadata=meta)  # type: ignore
          
                              if response := await self._process_for_target(merged, target, prompt):
                                  results.append(response)
                              queue.clear()
          
                          return results
                      case _:
                          msg = f"Invalid queue strategy: {self.queue_strategy}"
                          raise ValueError(msg)
          
              def when(self, condition: AnyFilterFn) -> Self:
                  """Add condition for message forwarding."""
                  self.filter_condition = condition
                  return self
          
              def transform[TNewData](
                  self,
                  transformer: Callable[
                      [ChatMessage[TTransmittedData]],
                      ChatMessage[TNewData] | Awaitable[ChatMessage[TNewData]],
                  ],
                  *,
                  name: str | None = None,
                  description: str | None = None,
              ) -> Talk[TNewData]:
                  """Chain a new transformation after existing ones.
          
                  Args:
                      transformer: Function to transform messages
                      name: Optional name for debugging
                      description: Optional description
          
                  Returns:
                      New Talk instance with chained transformation
          
                  Example:
                      ```python
                      talk = (agent1 >> agent2)
                          .transform(parse_json)      # str -> dict
                          .transform(extract_values)  # dict -> list
                      ```
                  """
                  new_talk = Talk[TNewData](
                      source=self.source,
                      targets=self.targets,
                      connection_type=self.connection_type,  # type: ignore
                  )
          
                  if self.transform_fn is not None:
                      oldtransform_fn = self.transform_fn
          
                      async def chainedtransform_fn(
                          data: ChatMessage[TTransmittedData],
                      ) -> ChatMessage[TNewData]:
                          intermediate = await execute(oldtransform_fn, data)
                          return await execute(transformer, intermediate)
          
                      new_talk.transform_fn = chainedtransform_fn  # type: ignore
                  else:
                      new_talk.transform_fn = transformer  # type: ignore
          
                  return new_talk
          
              @asynccontextmanager
              async def paused(self):
                  """Temporarily set inactive."""
                  previous = self.active
                  self.active = False
                  try:
                      yield self
                  finally:
                      self.active = previous
          
              def disconnect(self):
                  """Permanently disconnect the connection."""
                  self.active = False
          
              @property
              def stats(self) -> TalkStats:
                  """Get current connection statistics."""
                  return self._stats
          

          stats property

          stats: TalkStats
          

          Get current connection statistics.

          ConnectionProcessed dataclass

          Event emitted when a message flows through a connection.

          Source code in src/llmling_agent/talk/talk.py
          42
          43
          44
          45
          46
          47
          48
          49
          50
          51
          @dataclass(frozen=True)
          class ConnectionProcessed:
              """Event emitted when a message flows through a connection."""
          
              message: ChatMessage
              source: MessageEmitter
              targets: list[MessageNode]
              queued: bool
              connection_type: ConnectionType
              timestamp: datetime = field(default_factory=datetime.now)
          

          __init__

          __init__(
              source: MessageEmitter,
              targets: Sequence[MessageNode],
              group: TeamTalk | None = None,
              *,
              name: str | None = None,
              connection_type: ConnectionType = "run",
              wait_for_connections: bool = False,
              priority: int = 0,
              delay: timedelta | None = None,
              queued: bool = False,
              queue_strategy: QueueStrategy = "latest",
              transform: AnyTransformFn[ChatMessage[TTransmittedData]] | None = None,
              filter_condition: AnyFilterFn | None = None,
              stop_condition: AnyFilterFn | None = None,
              exit_condition: AnyFilterFn | None = None,
          )
          

          Initialize talk connection.

          Parameters:

          Name Type Description Default
          source MessageEmitter

          Agent sending messages

          required
          targets Sequence[MessageNode]

          Agents receiving messages

          required
          group TeamTalk | None

          Optional group this talk belongs to

          None
          name str | None

          Optional name for this talk

          None
          connection_type ConnectionType

          How to handle messages: - "run": Execute message as a new run in target - "context": Add message as context to target - "forward": Forward message to target's outbox

          'run'
          wait_for_connections bool

          Whether to wait for all targets to complete

          False
          priority int

          Task priority (lower = higher priority)

          0
          delay timedelta | None

          Optional delay before processing

          None
          queued bool

          Whether messages should be queued for manual processing

          False
          queue_strategy QueueStrategy

          How to process queued messages: - "concat": Combine all messages with newlines - "latest": Use only the most recent message - "buffer": Process all messages individually

          'latest'
          transform AnyTransformFn[ChatMessage[TTransmittedData]] | None

          Optional function to transform messages

          None
          filter_condition AnyFilterFn | None

          Optional condition for filtering messages

          None
          stop_condition AnyFilterFn | None

          Optional condition for disconnecting

          None
          exit_condition AnyFilterFn | None

          Optional condition for stopping the event loop

          None
          Source code in src/llmling_agent/talk/talk.py
           60
           61
           62
           63
           64
           65
           66
           67
           68
           69
           70
           71
           72
           73
           74
           75
           76
           77
           78
           79
           80
           81
           82
           83
           84
           85
           86
           87
           88
           89
           90
           91
           92
           93
           94
           95
           96
           97
           98
           99
          100
          101
          102
          103
          104
          105
          106
          107
          108
          109
          110
          111
          112
          113
          114
          115
          116
          117
          118
          119
          120
          121
          122
          def __init__(
              self,
              source: MessageEmitter,
              targets: Sequence[MessageNode],
              group: TeamTalk | None = None,
              *,
              name: str | None = None,
              connection_type: ConnectionType = "run",
              wait_for_connections: bool = False,
              priority: int = 0,
              delay: timedelta | None = None,
              queued: bool = False,
              queue_strategy: QueueStrategy = "latest",
              transform: AnyTransformFn[ChatMessage[TTransmittedData]] | None = None,
              filter_condition: AnyFilterFn | None = None,
              stop_condition: AnyFilterFn | None = None,
              exit_condition: AnyFilterFn | None = None,
          ):
              """Initialize talk connection.
          
              Args:
                  source: Agent sending messages
                  targets: Agents receiving messages
                  group: Optional group this talk belongs to
                  name: Optional name for this talk
                  connection_type: How to handle messages:
                      - "run": Execute message as a new run in target
                      - "context": Add message as context to target
                      - "forward": Forward message to target's outbox
                  wait_for_connections: Whether to wait for all targets to complete
                  priority: Task priority (lower = higher priority)
                  delay: Optional delay before processing
                  queued: Whether messages should be queued for manual processing
                  queue_strategy: How to process queued messages:
                      - "concat": Combine all messages with newlines
                      - "latest": Use only the most recent message
                      - "buffer": Process all messages individually
                  transform: Optional function to transform messages
                  filter_condition: Optional condition for filtering messages
                  stop_condition: Optional condition for disconnecting
                  exit_condition: Optional condition for stopping the event loop
              """
              self.source = source
              self.targets = list(targets)
              # Could perhaps better be an auto-inferring property
              self.name = name or f"{source.name}->{[t.name for t in targets]}"
              self.group = group
              self.priority = priority
              self.delay = delay
              self.active = True
              self.connection_type = connection_type
              self.wait_for_connections = wait_for_connections
              self.queued = queued
              self.queue_strategy = queue_strategy
              self._pending_messages = defaultdict[str, list[ChatMessage[TTransmittedData]]](
                  list
              )
              names = {t.name for t in targets}
              self._stats = TalkStats(source_name=source.name, target_names=names)
              self.transform_fn = transform
              self.filter_condition = filter_condition
              self.stop_condition = stop_condition
              self.exit_condition = exit_condition
          

          __rshift__

          __rshift__(
              other: MessageNode[Any, str]
              | ProcessorCallback[str]
              | Sequence[MessageNode[Any, str] | ProcessorCallback[str]],
          ) -> TeamTalk[str]
          
          __rshift__(
              other: MessageNode[Any, Any]
              | ProcessorCallback[Any]
              | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
          ) -> TeamTalk[Any]
          
          __rshift__(
              other: MessageNode[Any, Any]
              | ProcessorCallback[Any]
              | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
          ) -> TeamTalk[Any]
          

          Add another node as target to the connection or group.

          Example

          connection >> other_agent # Connect to single agent connection >> (agent2 & agent3) # Connect to group

          Source code in src/llmling_agent/talk/talk.py
          144
          145
          146
          147
          148
          149
          150
          151
          152
          153
          154
          155
          156
          157
          158
          159
          160
          161
          162
          163
          164
          165
          166
          167
          168
          169
          170
          171
          172
          173
          174
          175
          176
          177
          178
          def __rshift__(
              self,
              other: MessageNode[Any, Any]
              | ProcessorCallback[Any]
              | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
          ) -> TeamTalk[Any]:
              """Add another node as target to the connection or group.
          
              Example:
                  connection >> other_agent  # Connect to single agent
                  connection >> (agent2 & agent3)  # Connect to group
              """
              from llmling_agent.agent import Agent, StructuredAgent
              from llmling_agent.messaging.messagenode import MessageNode
              from llmling_agent.talk import TeamTalk
              from llmling_agent.utils.inspection import has_return_type
          
              match other:
                  case Callable():
                      if has_return_type(other, str):
                          other = Agent.from_callback(other)
                      else:
                          other = StructuredAgent.from_callback(other)
                      if pool := self.source.context.pool:
                          pool.register(other.name, other)
                      return self.__rshift__(other)
                  case Sequence():
                      team_talks = [self.__rshift__(o) for o in other]
                      return TeamTalk([self, *team_talks])
                  case MessageNode():
                      talks = [t.__rshift__(other) for t in self.targets]
                      return TeamTalk([self, *talks])
                  case _:
                      msg = f"Invalid agent type: {type(other)}"
                      raise TypeError(msg)
          

          _evaluate_condition async

          _evaluate_condition(
              condition: Callable[..., bool | Awaitable[bool]] | None,
              message: ChatMessage[Any],
              target: MessageNode,
              *,
              default_return: bool = False,
          ) -> bool
          

          Evaluate a condition with flexible parameter handling.

          Source code in src/llmling_agent/talk/talk.py
          180
          181
          182
          183
          184
          185
          186
          187
          188
          189
          190
          191
          192
          193
          194
          195
          196
          197
          198
          199
          200
          201
          202
          203
          204
          205
          async def _evaluate_condition(
              self,
              condition: Callable[..., bool | Awaitable[bool]] | None,
              message: ChatMessage[Any],
              target: MessageNode,
              *,
              default_return: bool = False,
          ) -> bool:
              """Evaluate a condition with flexible parameter handling."""
              from llmling_agent.talk.registry import EventContext
          
              if not condition:
                  return default_return
              registry = (
                  context.pool.connection_registry
                  if (context := self.source.context) and context.pool
                  else None
              )
              ctx = EventContext(
                  message=message,
                  target=target,
                  stats=self.stats,
                  registry=registry,
                  talk=self,
              )
              return await execute(condition, ctx)
          

          _handle_message async

          _handle_message(
              message: ChatMessage[TTransmittedData], prompt: str | None = None
          ) -> list[ChatMessage[Any]]
          

          Handle message forwarding based on connection configuration.

          Source code in src/llmling_agent/talk/talk.py
          244
          245
          246
          247
          248
          249
          250
          251
          252
          253
          254
          255
          256
          257
          258
          259
          260
          261
          262
          263
          264
          265
          266
          267
          268
          269
          270
          271
          272
          273
          274
          275
          276
          277
          278
          279
          280
          281
          282
          283
          284
          285
          286
          287
          288
          289
          290
          291
          292
          293
          294
          295
          296
          297
          298
          299
          300
          301
          302
          303
          304
          305
          306
          307
          308
          309
          async def _handle_message(
              self,
              message: ChatMessage[TTransmittedData],
              prompt: str | None = None,
          ) -> list[ChatMessage[Any]]:
              """Handle message forwarding based on connection configuration."""
              # 2. Early exit checks
              if not (self.active and (not self.group or self.group.active)):
                  return []
          
              # 3. Check exit condition for any target
              for target in self.targets:
                  # Exit if condition returns True
                  if await self._evaluate_condition(self.exit_condition, message, target):
                      raise SystemExit
          
              # 4. Check stop condition for any target
              for target in self.targets:
                  # Stop if condition returns True
                  if await self._evaluate_condition(self.stop_condition, message, target):
                      self.disconnect()
                      return []
          
              # 5. Transform if configured
              processed_message = message
              if self.transform_fn:
                  processed_message = await execute(self.transform_fn, message)
              # 6. First pass: Determine target list
              target_list: list[MessageNode] = [
                  target
                  for target in self.targets
                  if await self._evaluate_condition(
                      self.filter_condition,
                      processed_message,
                      target,
                      default_return=True,
                  )
              ]
              # 7. emit connection processed event
              self.connection_processed.emit(
                  self.ConnectionProcessed(
                      message=processed_message,
                      source=self.source,
                      targets=target_list,
                      queued=self.queued,
                      connection_type=self.connection_type,  # pyright: ignore
                  )
              )
              # 8. if we have targets, update stats and emit message forwarded
              if target_list:
                  messages = [*self._stats.messages, processed_message]
                  self._stats = replace(self._stats, messages=messages)
                  self.message_forwarded.emit(processed_message)
          
              # 9. Second pass: Actually process for each target
              responses: list[ChatMessage[Any]] = []
              for target in target_list:
                  if self.queued:
                      self._pending_messages[target.name].append(processed_message)
                      continue
                  if response := await self._process_for_target(
                      processed_message, target, prompt
                  ):
                      responses.append(response)
          
              return responses
          

          _process_for_target async

          _process_for_target(
              message: ChatMessage[Any],
              target: MessageNode,
              prompt: AnyPromptType | Image | PathLike[str] | None = None,
          ) -> ChatMessage[Any] | None
          

          Process message for a single target.

          Source code in src/llmling_agent/talk/talk.py
          311
          312
          313
          314
          315
          316
          317
          318
          319
          320
          321
          322
          323
          324
          325
          326
          327
          328
          329
          330
          331
          332
          333
          334
          335
          336
          337
          338
          339
          340
          341
          342
          343
          344
          345
          346
          347
          348
          349
          350
          351
          352
          353
          354
          355
          356
          357
          358
          359
          360
          361
          362
          363
          364
          365
          366
          367
          368
          369
          370
          async def _process_for_target(
              self,
              message: ChatMessage[Any],
              target: MessageNode,
              prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None = None,
          ) -> ChatMessage[Any] | None:
              """Process message for a single target."""
              from llmling_agent.agent import Agent, StructuredAgent
              from llmling_agent.delegation.base_team import BaseTeam
          
              match self.connection_type:
                  case "run":
                      prompts: list[AnyPromptType | PIL.Image.Image | os.PathLike[str]] = [
                          message
                      ]
                      if prompt:
                          prompts.append(prompt)
                      return await target.run(*prompts)
          
                  case "context":
                      meta = {
                          "type": "forwarded_message",
                          "role": message.role,
                          "model": message.model,
                          "cost_info": message.cost_info,
                          "timestamp": message.timestamp.isoformat(),
                          "prompt": prompt,
                      }
          
                      async def add_context():
                          match target:
                              case BaseTeam():
                                  # Use distribute for teams
                                  await target.distribute(str(message.content), metadata=meta)
                              case Agent() | StructuredAgent():  # Agent case
                                  # Use existing context message approach
                                  target.conversation.add_context_message(
                                      str(message.content),
                                      source=message.name,
                                      metadata=meta,
                                  )
          
                      if self.delay is not None or self.priority != 0:
                          coro = add_context()
                          target.run_background(coro, priority=self.priority, delay=self.delay)
                      else:
                          await add_context()
                      return None
          
                  case "forward":
                      if self.delay is not None or self.priority != 0:
          
                          async def delayed_emit():
                              target.outbox.emit(message, prompt)
          
                          coro = delayed_emit()
                          target.run_background(coro, priority=self.priority, delay=self.delay)
                      else:
                          target.outbox.emit(message, prompt)
                      return None
          

          disconnect

          disconnect()
          

          Permanently disconnect the connection.

          Source code in src/llmling_agent/talk/talk.py
          492
          493
          494
          def disconnect(self):
              """Permanently disconnect the connection."""
              self.active = False
          

          on_event

          on_event(
              event_type: ConnectionEventType,
              callback: Callable[[ConnectionEventData[TTransmittedData]], None | Awaitable[None]],
          ) -> Self
          

          Register callback for connection events.

          Source code in src/llmling_agent/talk/talk.py
          207
          208
          209
          210
          211
          212
          213
          214
          215
          216
          217
          218
          219
          220
          221
          222
          def on_event(
              self,
              event_type: ConnectionEventType,
              callback: Callable[
                  [ConnectionEventData[TTransmittedData]], None | Awaitable[None]
              ],
          ) -> Self:
              """Register callback for connection events."""
              from llmling_agent.messaging.events import ConnectionEventData
          
              async def wrapped_callback(event: EventData):
                  if isinstance(event, ConnectionEventData) and event.event_type == event_type:
                      await execute(callback, event)
          
              self.source._events.add_callback(wrapped_callback)
              return self
          

          paused async

          paused()
          

          Temporarily set inactive.

          Source code in src/llmling_agent/talk/talk.py
          482
          483
          484
          485
          486
          487
          488
          489
          490
          @asynccontextmanager
          async def paused(self):
              """Temporarily set inactive."""
              previous = self.active
              self.active = False
              try:
                  yield self
              finally:
                  self.active = previous
          

          transform

          transform(
              transformer: Callable[
                  [ChatMessage[TTransmittedData]],
                  ChatMessage[TNewData] | Awaitable[ChatMessage[TNewData]],
              ],
              *,
              name: str | None = None,
              description: str | None = None,
          ) -> Talk[TNewData]
          

          Chain a new transformation after existing ones.

          Parameters:

          Name Type Description Default
          transformer Callable[[ChatMessage[TTransmittedData]], ChatMessage[TNewData] | Awaitable[ChatMessage[TNewData]]]

          Function to transform messages

          required
          name str | None

          Optional name for debugging

          None
          description str | None

          Optional description

          None

          Returns:

          Type Description
          Talk[TNewData]

          New Talk instance with chained transformation

          Example
          talk = (agent1 >> agent2)
              .transform(parse_json)      # str -> dict
              .transform(extract_values)  # dict -> list
          
          Source code in src/llmling_agent/talk/talk.py
          434
          435
          436
          437
          438
          439
          440
          441
          442
          443
          444
          445
          446
          447
          448
          449
          450
          451
          452
          453
          454
          455
          456
          457
          458
          459
          460
          461
          462
          463
          464
          465
          466
          467
          468
          469
          470
          471
          472
          473
          474
          475
          476
          477
          478
          479
          480
          def transform[TNewData](
              self,
              transformer: Callable[
                  [ChatMessage[TTransmittedData]],
                  ChatMessage[TNewData] | Awaitable[ChatMessage[TNewData]],
              ],
              *,
              name: str | None = None,
              description: str | None = None,
          ) -> Talk[TNewData]:
              """Chain a new transformation after existing ones.
          
              Args:
                  transformer: Function to transform messages
                  name: Optional name for debugging
                  description: Optional description
          
              Returns:
                  New Talk instance with chained transformation
          
              Example:
                  ```python
                  talk = (agent1 >> agent2)
                      .transform(parse_json)      # str -> dict
                      .transform(extract_values)  # dict -> list
                  ```
              """
              new_talk = Talk[TNewData](
                  source=self.source,
                  targets=self.targets,
                  connection_type=self.connection_type,  # type: ignore
              )
          
              if self.transform_fn is not None:
                  oldtransform_fn = self.transform_fn
          
                  async def chainedtransform_fn(
                      data: ChatMessage[TTransmittedData],
                  ) -> ChatMessage[TNewData]:
                      intermediate = await execute(oldtransform_fn, data)
                      return await execute(transformer, intermediate)
          
                  new_talk.transform_fn = chainedtransform_fn  # type: ignore
              else:
                  new_talk.transform_fn = transformer  # type: ignore
          
              return new_talk
          

          trigger async

          trigger(
              prompt: AnyPromptType | Image | PathLike[str] | None = None,
          ) -> list[ChatMessage[TTransmittedData]]
          

          Process queued messages.

          Source code in src/llmling_agent/talk/talk.py
          372
          373
          374
          375
          376
          377
          378
          379
          380
          381
          382
          383
          384
          385
          386
          387
          388
          389
          390
          391
          392
          393
          394
          395
          396
          397
          398
          399
          400
          401
          402
          403
          404
          405
          406
          407
          408
          409
          410
          411
          412
          413
          414
          415
          416
          417
          418
          419
          420
          421
          422
          423
          424
          425
          426
          427
          async def trigger(
              self, prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None = None
          ) -> list[ChatMessage[TTransmittedData]]:
              """Process queued messages."""
              if not self._pending_messages:
                  return []
              match self.queue_strategy:
                  case "buffer":
                      results: list[ChatMessage[TTransmittedData]] = []
                      # Process each agent's queue
                      for target in self.targets:
                          queue = self._pending_messages[target.name]
                          for msg in queue:
                              if resp := await self._process_for_target(msg, target, prompt):
                                  results.append(resp)  # noqa: PERF401
                          queue.clear()
                      return results
          
                  case "latest":
                      results = []
                      # Get latest message for each agent
                      for target in self.targets:
                          queue = self._pending_messages[target.name]
                          if queue:
                              latest = queue[-1]
                              if resp := await self._process_for_target(latest, target, prompt):
                                  results.append(resp)
                              queue.clear()
                      return results
          
                  case "concat":
                      results = []
                      # Concat messages per agent
                      for target in self.targets:
                          queue = self._pending_messages[target.name]
                          if not queue:
                              continue
          
                          base = queue[-1]
                          contents = [str(m.content) for m in queue]
                          meta = {
                              **base.metadata,
                              "merged_count": len(queue),
                              "queue_strategy": self.queue_strategy,
                          }
                          content = "\n\n".join(contents)
                          merged = replace(base, content=content, metadata=meta)  # type: ignore
          
                          if response := await self._process_for_target(merged, target, prompt):
                              results.append(response)
                          queue.clear()
          
                      return results
                  case _:
                      msg = f"Invalid queue strategy: {self.queue_strategy}"
                      raise ValueError(msg)
          

          when

          when(condition: AnyFilterFn) -> Self
          

          Add condition for message forwarding.

          Source code in src/llmling_agent/talk/talk.py
          429
          430
          431
          432
          def when(self, condition: AnyFilterFn) -> Self:
              """Add condition for message forwarding."""
              self.filter_condition = condition
              return self
          

          TalkStats dataclass

          Bases: MessageStats

          Statistics for a single connection.

          Source code in src/llmling_agent/talk/stats.py
          60
          61
          62
          63
          64
          65
          @dataclass(frozen=True, kw_only=True)
          class TalkStats(MessageStats):
              """Statistics for a single connection."""
          
              source_name: str | None
              target_names: set[str]
          

          TeamTalk

          Bases: list['Talk | TeamTalk']

          Group of connections with aggregate operations.

          Source code in src/llmling_agent/talk/talk.py
          502
          503
          504
          505
          506
          507
          508
          509
          510
          511
          512
          513
          514
          515
          516
          517
          518
          519
          520
          521
          522
          523
          524
          525
          526
          527
          528
          529
          530
          531
          532
          533
          534
          535
          536
          537
          538
          539
          540
          541
          542
          543
          544
          545
          546
          547
          548
          549
          550
          551
          552
          553
          554
          555
          556
          557
          558
          559
          560
          561
          562
          563
          564
          565
          566
          567
          568
          569
          570
          571
          572
          573
          574
          575
          576
          577
          578
          579
          580
          581
          582
          583
          584
          585
          586
          587
          588
          589
          590
          591
          592
          593
          594
          595
          596
          597
          598
          599
          600
          601
          602
          603
          604
          605
          606
          607
          608
          609
          610
          611
          612
          613
          614
          615
          616
          617
          618
          619
          620
          class TeamTalk[TTransmittedData](list["Talk | TeamTalk"]):
              """Group of connections with aggregate operations."""
          
              def __init__(
                  self, talks: Sequence[Talk[TTransmittedData] | TeamTalk[TTransmittedData]]
              ):
                  super().__init__(talks)
                  self.filter_condition: AnyFilterFn | None = None
                  self.active = True
          
              def __repr__(self):
                  return f"TeamTalk({list(self)})"
          
              def __rshift__(
                  self,
                  other: MessageNode[Any, Any]
                  | ProcessorCallback[Any]
                  | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
              ) -> TeamTalk[Any]:
                  """Add another node as target to the connection or group.
          
                  Example:
                      connection >> other_agent  # Connect to single agent
                      connection >> (agent2 & agent3)  # Connect to group
                  """
                  from llmling_agent.agent import Agent, StructuredAgent
                  from llmling_agent.messaging.messagenode import MessageNode
                  from llmling_agent.talk import TeamTalk
                  from llmling_agent.utils.inspection import has_return_type
          
                  match other:
                      case Callable():
                          if has_return_type(other, str):
                              other = Agent.from_callback(other)
                          else:
                              other = StructuredAgent.from_callback(other)
                          for talk_ in self.iter_talks():
                              if pool := talk_.source.context.pool:
                                  pool.register(other.name, other)
                                  break
                          return self.__rshift__(other)
                      case Sequence():
                          team_talks = [self.__rshift__(o) for o in other]
                          return TeamTalk([self, *team_talks])
                      case MessageNode():
                          talks = [t.connect_to(other) for t in self.targets]
                          return TeamTalk([self, *talks])
                      case _:
                          msg = f"Invalid agent type: {type(other)}"
                          raise TypeError(msg)
          
              @property
              def targets(self) -> list[MessageNode]:
                  """Get all targets from all connections."""
                  return [t for talk in self for t in talk.targets]
          
              def iter_talks(self) -> Iterator[Talk]:
                  """Get all contained talks."""
                  for t in self:
                      match t:
                          case Talk():
                              yield t
                          case TeamTalk():
                              yield from t.iter_talks()
          
              async def _handle_message(self, message: ChatMessage[Any], prompt: str | None = None):
                  for talk in self:
                      await talk._handle_message(message, prompt)
          
              async def trigger(
                  self, prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None = None
              ) -> list[ChatMessage]:
                  messages = []
                  for talk in self:
                      messages.extend(await talk.trigger(prompt))
                  return messages
          
              @classmethod
              def from_nodes(
                  cls,
                  agents: Sequence[MessageNode],
                  targets: list[MessageNode] | None = None,
              ) -> Self:
                  """Create TeamTalk from a collection of agents."""
                  return cls([Talk(agent, targets or []) for agent in agents])
          
              @asynccontextmanager
              async def paused(self):
                  """Temporarily set inactive."""
                  previous = self.active
                  self.active = False
                  try:
                      yield self
                  finally:
                      self.active = previous
          
              def has_active_talks(self) -> bool:
                  """Check if any contained talks are active."""
                  return any(talk.active for talk in self)
          
              def get_active_talks(self) -> list[Talk | TeamTalk]:
                  """Get list of currently active talks."""
                  return [talk for talk in self if talk.active]
          
              @property
              def stats(self) -> AggregatedTalkStats:
                  """Get aggregated statistics for all connections."""
                  return AggregatedTalkStats(stats=[talk.stats for talk in self])
          
              def when(self, condition: AnyFilterFn) -> Self:
                  """Add condition to all connections in group."""
                  for talk in self:
                      talk.when(condition)
                  return self
          
              def disconnect(self):
                  """Disconnect all connections in group."""
                  for talk in self:
                      talk.disconnect()
          

          stats property

          Get aggregated statistics for all connections.

          targets property

          targets: list[MessageNode]
          

          Get all targets from all connections.

          __rshift__

          __rshift__(
              other: MessageNode[Any, Any]
              | ProcessorCallback[Any]
              | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
          ) -> TeamTalk[Any]
          

          Add another node as target to the connection or group.

          Example

          connection >> other_agent # Connect to single agent connection >> (agent2 & agent3) # Connect to group

          Source code in src/llmling_agent/talk/talk.py
          515
          516
          517
          518
          519
          520
          521
          522
          523
          524
          525
          526
          527
          528
          529
          530
          531
          532
          533
          534
          535
          536
          537
          538
          539
          540
          541
          542
          543
          544
          545
          546
          547
          548
          549
          550
          551
          def __rshift__(
              self,
              other: MessageNode[Any, Any]
              | ProcessorCallback[Any]
              | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
          ) -> TeamTalk[Any]:
              """Add another node as target to the connection or group.
          
              Example:
                  connection >> other_agent  # Connect to single agent
                  connection >> (agent2 & agent3)  # Connect to group
              """
              from llmling_agent.agent import Agent, StructuredAgent
              from llmling_agent.messaging.messagenode import MessageNode
              from llmling_agent.talk import TeamTalk
              from llmling_agent.utils.inspection import has_return_type
          
              match other:
                  case Callable():
                      if has_return_type(other, str):
                          other = Agent.from_callback(other)
                      else:
                          other = StructuredAgent.from_callback(other)
                      for talk_ in self.iter_talks():
                          if pool := talk_.source.context.pool:
                              pool.register(other.name, other)
                              break
                      return self.__rshift__(other)
                  case Sequence():
                      team_talks = [self.__rshift__(o) for o in other]
                      return TeamTalk([self, *team_talks])
                  case MessageNode():
                      talks = [t.connect_to(other) for t in self.targets]
                      return TeamTalk([self, *talks])
                  case _:
                      msg = f"Invalid agent type: {type(other)}"
                      raise TypeError(msg)
          

          disconnect

          disconnect()
          

          Disconnect all connections in group.

          Source code in src/llmling_agent/talk/talk.py
          617
          618
          619
          620
          def disconnect(self):
              """Disconnect all connections in group."""
              for talk in self:
                  talk.disconnect()
          

          from_nodes classmethod

          from_nodes(
              agents: Sequence[MessageNode], targets: list[MessageNode] | None = None
          ) -> Self
          

          Create TeamTalk from a collection of agents.

          Source code in src/llmling_agent/talk/talk.py
          579
          580
          581
          582
          583
          584
          585
          586
          @classmethod
          def from_nodes(
              cls,
              agents: Sequence[MessageNode],
              targets: list[MessageNode] | None = None,
          ) -> Self:
              """Create TeamTalk from a collection of agents."""
              return cls([Talk(agent, targets or []) for agent in agents])
          

          get_active_talks

          get_active_talks() -> list[Talk | TeamTalk]
          

          Get list of currently active talks.

          Source code in src/llmling_agent/talk/talk.py
          602
          603
          604
          def get_active_talks(self) -> list[Talk | TeamTalk]:
              """Get list of currently active talks."""
              return [talk for talk in self if talk.active]
          

          has_active_talks

          has_active_talks() -> bool
          

          Check if any contained talks are active.

          Source code in src/llmling_agent/talk/talk.py
          598
          599
          600
          def has_active_talks(self) -> bool:
              """Check if any contained talks are active."""
              return any(talk.active for talk in self)
          

          iter_talks

          iter_talks() -> Iterator[Talk]
          

          Get all contained talks.

          Source code in src/llmling_agent/talk/talk.py
          558
          559
          560
          561
          562
          563
          564
          565
          def iter_talks(self) -> Iterator[Talk]:
              """Get all contained talks."""
              for t in self:
                  match t:
                      case Talk():
                          yield t
                      case TeamTalk():
                          yield from t.iter_talks()
          

          paused async

          paused()
          

          Temporarily set inactive.

          Source code in src/llmling_agent/talk/talk.py
          588
          589
          590
          591
          592
          593
          594
          595
          596
          @asynccontextmanager
          async def paused(self):
              """Temporarily set inactive."""
              previous = self.active
              self.active = False
              try:
                  yield self
              finally:
                  self.active = previous
          

          when

          when(condition: AnyFilterFn) -> Self
          

          Add condition to all connections in group.

          Source code in src/llmling_agent/talk/talk.py
          611
          612
          613
          614
          615
          def when(self, condition: AnyFilterFn) -> Self:
              """Add condition to all connections in group."""
              for talk in self:
                  talk.when(condition)
              return self