Skip to content

messaging

Class info

Classes

Name Children Inherits
AgentResponse
llmling_agent.messaging.messages
Result from an agent's execution.
    ChatMessage
    llmling_agent.messaging.messages
    Common message format for all UI types.
      ChatMessageContainer
      llmling_agent.messaging.message_container
      Container for tracking and managing chat messages.
        EventManager
        llmling_agent.messaging.event_manager
        Manages multiple event sources and their lifecycles.
          MessageEmitter
          llmling_agent.messaging.messageemitter
          Base class for all message processing nodes.
          MessageNode
          llmling_agent.messaging.messagenode
          Base class for all message processing nodes.
          TeamResponse
          llmling_agent.messaging.messages
          Results from a team execution.
            TokenCost
            llmling_agent.messaging.messages
            Combined token and cost tracking.

              🛈 DocStrings

              Core messsaging classes for LLMling agent.

              AgentResponse dataclass

              Result from an agent's execution.

              Source code in src/llmling_agent/messaging/messages.py
              579
              580
              581
              582
              583
              584
              585
              586
              587
              588
              589
              590
              591
              592
              593
              594
              595
              596
              597
              598
              599
              600
              601
              602
              603
              @dataclass
              class AgentResponse[TResult]:
                  """Result from an agent's execution."""
              
                  agent_name: str
                  """Name of the agent that produced this result"""
              
                  message: ChatMessage[TResult] | None
                  """The actual message with content and metadata"""
              
                  timing: float | None = None
                  """Time taken by this agent in seconds"""
              
                  error: str | None = None
                  """Error message if agent failed"""
              
                  @property
                  def success(self) -> bool:
                      """Whether the agent completed successfully."""
                      return self.error is None
              
                  @property
                  def response(self) -> TResult | None:
                      """Convenient access to message content."""
                      return self.message.content if self.message else None
              

              agent_name instance-attribute

              agent_name: str
              

              Name of the agent that produced this result

              error class-attribute instance-attribute

              error: str | None = None
              

              Error message if agent failed

              message instance-attribute

              message: ChatMessage[TResult] | None
              

              The actual message with content and metadata

              response property

              response: TResult | None
              

              Convenient access to message content.

              success property

              success: bool
              

              Whether the agent completed successfully.

              timing class-attribute instance-attribute

              timing: float | None = None
              

              Time taken by this agent in seconds

              ChatMessage dataclass

              Common message format for all UI types.

              Generically typed with: ChatMessage[Type of Content] The type can either be str or a BaseModel subclass.

              Source code in src/llmling_agent/messaging/messages.py
              159
              160
              161
              162
              163
              164
              165
              166
              167
              168
              169
              170
              171
              172
              173
              174
              175
              176
              177
              178
              179
              180
              181
              182
              183
              184
              185
              186
              187
              188
              189
              190
              191
              192
              193
              194
              195
              196
              197
              198
              199
              200
              201
              202
              203
              204
              205
              206
              207
              208
              209
              210
              211
              212
              213
              214
              215
              216
              217
              218
              219
              220
              221
              222
              223
              224
              225
              226
              227
              228
              229
              230
              231
              232
              233
              234
              235
              236
              237
              238
              239
              240
              241
              242
              243
              244
              245
              246
              247
              248
              249
              250
              251
              252
              253
              254
              255
              256
              257
              258
              259
              260
              261
              262
              263
              264
              265
              266
              267
              268
              269
              270
              271
              272
              273
              274
              275
              276
              277
              278
              279
              280
              281
              282
              283
              284
              285
              286
              287
              288
              289
              290
              291
              292
              293
              294
              295
              296
              297
              298
              299
              300
              301
              302
              303
              304
              305
              306
              307
              308
              309
              310
              311
              312
              313
              314
              315
              316
              317
              318
              319
              320
              321
              322
              323
              324
              325
              326
              327
              328
              329
              330
              331
              332
              333
              334
              335
              336
              337
              338
              339
              340
              341
              342
              343
              344
              345
              346
              347
              348
              349
              350
              351
              352
              353
              354
              355
              356
              357
              358
              359
              360
              361
              362
              363
              364
              365
              366
              367
              368
              369
              370
              371
              372
              373
              374
              375
              376
              377
              378
              379
              380
              381
              382
              383
              384
              385
              386
              387
              388
              389
              390
              391
              392
              393
              394
              395
              396
              397
              398
              399
              400
              401
              402
              403
              404
              405
              406
              407
              408
              409
              410
              411
              412
              413
              414
              415
              416
              417
              418
              419
              420
              421
              422
              423
              424
              425
              426
              427
              428
              429
              430
              431
              432
              433
              434
              435
              436
              437
              438
              439
              440
              441
              442
              443
              444
              445
              446
              447
              448
              449
              450
              451
              452
              453
              454
              455
              456
              457
              458
              459
              460
              461
              462
              463
              464
              465
              466
              467
              468
              469
              470
              471
              472
              473
              474
              475
              476
              477
              478
              479
              480
              481
              482
              483
              484
              485
              486
              487
              488
              489
              490
              491
              492
              493
              494
              495
              496
              497
              498
              499
              500
              501
              502
              503
              504
              505
              506
              507
              508
              509
              510
              511
              512
              513
              514
              515
              516
              517
              518
              519
              520
              521
              522
              523
              524
              525
              526
              527
              528
              529
              530
              531
              532
              533
              534
              535
              536
              537
              538
              539
              540
              541
              542
              543
              544
              545
              546
              547
              548
              549
              550
              551
              552
              553
              554
              555
              556
              557
              558
              559
              560
              561
              562
              563
              564
              565
              566
              567
              568
              569
              570
              571
              572
              573
              574
              575
              576
              @dataclass
              class ChatMessage[TContent]:
                  """Common message format for all UI types.
              
                  Generically typed with: ChatMessage[Type of Content]
                  The type can either be str or a BaseModel subclass.
                  """
              
                  content: TContent
                  """Message content, typed as TContent (either str or BaseModel)."""
              
                  role: MessageRole
                  """Role of the message sender (user/assistant)."""
              
                  metadata: SimpleJsonType = field(default_factory=dict)
                  """Additional metadata about the message."""
              
                  timestamp: datetime = field(default_factory=get_now)
                  """When this message was created."""
              
                  cost_info: TokenCost | None = None
                  """Token usage and costs for this specific message if available."""
              
                  message_id: str = field(default_factory=lambda: str(uuid4()))
                  """Unique identifier for this message."""
              
                  conversation_id: str | None = None
                  """ID of the conversation this message belongs to."""
              
                  response_time: float | None = None
                  """Time it took the LLM to respond."""
              
                  associated_messages: list[ChatMessage[Any]] = field(default_factory=list)
                  """List of messages which were generated during the the creation of this messsage."""
              
                  name: str | None = None
                  """Display name for the message sender in UI."""
              
                  forwarded_from: list[str] = field(default_factory=list)
                  """List of agent names (the chain) that forwarded this message to the sender."""
              
                  provider_details: dict[str, Any] = field(default_factory=dict)
                  """Provider specific metadata / extra information."""
              
                  messages: list[ModelMessage] = field(default_factory=list)
                  """List of messages which were generated during the the creation of this messsage."""
              
                  usage: RequestUsage = field(default_factory=RequestUsage)
                  """Usage information for the request.
              
                  This has a default to make tests easier,
                  and to support loading old messages where usage will be missing.
                  """
              
                  model_name: str | None = None
                  """The name of the model that generated the response."""
              
                  provider_name: str | None = None
                  """The name of the LLM provider that generated the response."""
              
                  provider_response_id: str | None = None
                  """request ID as specified by the model provider.
              
                  This can be used to track the specific request to the model."""
              
                  finish_reason: FinishReason | None = None
                  """Reason the model finished generating the response.
              
                  Normalized to OpenTelemetry values."""
              
                  @property
                  def last_message(self) -> ModelMessage:
                      """Return the last message from the message history."""
                      # The response may not be the very last item if it contained an output tool call.
                      # See `CallToolsNode._handle_final_result`.
                      for message in reversed(self.messages):
                          if isinstance(message, ModelRequest | ModelResponse):
                              return message
                      msg = "No last message found in the message history"
                      raise ValueError(msg)  # pragma: no cover
              
                  @property
                  def parts(self) -> Sequence[ModelRequestPart] | Sequence[ModelResponsePart]:
                      """The parts of the last model message."""
                      return self.last_message.parts
              
                  @property
                  def kind(self) -> Literal["request", "response"]:
                      """Role of the message."""
                      match self.role:
                          case "assistant":
                              return "response"
                          case "user":
                              return "request"
              
                  def to_pydantic_ai(self) -> Sequence[ModelMessage]:
                      """Convert this message to a Pydantic model."""
                      if self.messages:
                          return self.messages
                      match self.kind:
                          case "request":
                              return [ModelRequest(parts=self.parts, instructions=None)]  # type: ignore
                          case "response":
                              return [
                                  ModelResponse(
                                      parts=self.parts,  # type: ignore
                                      usage=self.usage,
                                      model_name=self.model_name,
                                      timestamp=self.timestamp,
                                      provider_name=self.provider_name,
                                      provider_details=self.provider_details,
                                      finish_reason=self.finish_reason,
                                      provider_response_id=self.provider_response_id,
                                  )
                              ]
              
                  @classmethod
                  def user_prompt[TPromptContent: str | Sequence[UserContent] = str](
                      cls,
                      message: TPromptContent,
                      conversation_id: str | None = None,
                      instructions: str | None = None,
                  ) -> ChatMessage[TPromptContent]:
                      """Create a user prompt message."""
                      part = UserPromptPart(content=message)
                      request = ModelRequest(parts=[part], instructions=instructions)
                      return ChatMessage(
                          messages=[request],
                          role="user",
                          content=message,
                          conversation_id=conversation_id or str(uuid4()),
                      )
              
                  @classmethod
                  def from_pydantic_ai[TContentType](
                      cls,
                      content: TContentType,
                      message: ModelMessage,
                      conversation_id: str | None = None,
                      name: str | None = None,
                      message_id: str | None = None,
                      forwarded_from: list[str] | None = None,
                  ) -> ChatMessage[TContentType]:
                      """Convert a Pydantic model to a ChatMessage."""
                      match message:
                          case ModelRequest(instructions=_instructions):
                              return ChatMessage(
                                  messages=[message],
                                  content=content,
                                  role="user" if message.kind == "request" else "assistant",
                                  message_id=message_id or str(uuid.uuid4()),
                                  # instructions=instructions,
                                  forwarded_from=forwarded_from or [],
                                  name=name,
                              )
                          case ModelResponse(
                              usage=usage,
                              model_name=model_name,
                              timestamp=timestamp,
                              provider_name=provider_name,
                              provider_details=provider_details,
                              finish_reason=finish_reason,
                              provider_response_id=provider_response_id,
                          ):
                              return ChatMessage(
                                  role="user" if message.kind == "request" else "assistant",
                                  content=content,
                                  messages=[message],
                                  usage=usage,
                                  message_id=message_id or str(uuid.uuid4()),
                                  conversation_id=conversation_id,
                                  model_name=model_name,
                                  timestamp=timestamp,
                                  provider_name=provider_name,
                                  provider_details=provider_details or {},
                                  finish_reason=finish_reason,
                                  provider_response_id=provider_response_id,
                                  name=name,
                                  forwarded_from=forwarded_from or [],
                              )
                          case _:
                              msg = f"Unknown message kind: {message.kind}"
                              raise ValueError(msg)
              
                  @classmethod
                  async def from_run_result[OutputDataT](
                      cls,
                      result: AgentRunResult[OutputDataT],
                      *,
                      agent_name: str | None = None,
                      message_id: str | None = None,
                      conversation_id: str | None = None,
                      response_time: float,
                  ) -> ChatMessage[OutputDataT]:
                      """Create a ChatMessage from a PydanticAI run result.
              
                      Args:
                          result: The PydanticAI run result
                          agent_name: Name of the agent that generated this response
                          message_id: Unique message identifier
                          conversation_id: Conversation identifier
                          response_time: Total time taken for the response
              
                      Returns:
                          A ChatMessage with all fields populated from the result
                      """
                      # Calculate costs
                      run_usage = result.usage()
                      usage = result.response.usage
                      cost_info = await TokenCost.from_usage(
                          model=result.response.model_name or "", usage=run_usage
                      )
              
                      return ChatMessage[OutputDataT](
                          content=result.output,
                          role="assistant",
                          name=agent_name,
                          model_name=result.response.model_name,
                          finish_reason=result.response.finish_reason,
                          messages=result.new_messages(),
                          provider_response_id=result.response.provider_response_id,
                          usage=usage,
                          provider_name=result.response.provider_name,
                          message_id=message_id or str(uuid4()),
                          conversation_id=conversation_id,
                          cost_info=cost_info,
                          response_time=response_time,
                          provider_details={},
                      )
              
                  def forwarded(self, previous_message: ChatMessage[Any]) -> Self:
                      """Create new message showing it was forwarded from another message.
              
                      Args:
                          previous_message: The message that led to this one's creation
              
                      Returns:
                          New message with updated chain showing the path through previous message
                      """
                      from_ = [*previous_message.forwarded_from, previous_message.name or "unknown"]
                      return replace(self, forwarded_from=from_)
              
                  @property
                  def response(self) -> ModelResponse:
                      """Return the last response from the message history."""
                      # The response may not be the very last item if it contained an output tool call.
                      #  See `CallToolsNode._handle_final_result`.
                      for message in reversed(self.messages):
                          if isinstance(message, ModelResponse):
                              return message
                      msg = "No response found in the message history"
                      raise ValueError(msg)
              
                  def to_request(self) -> Self:
                      """Convert this message to a request message.
              
                      If the message is already a request (user role), this is a no-op.
                      If it's a response (assistant role), converts response parts to user content.
              
                      Returns:
                          New ChatMessage with role='user' and converted parts
                      """
                      if self.role == "user":
                          return self  # Already a request, return as-is
              
                      user_content: list[UserContent] = []  # Convert response parts to user content
                      for part in self.parts:
                          match part:
                              case TextPart(content=text_content):
                                  # Text parts become user content strings
                                  user_content.append(text_content)
                              case FilePart(content=binary_content):
                                  # File parts (images, etc.) become user content directly
                                  user_content.append(binary_content)
                              case BaseToolReturnPart(
                                  content=(
                                      ImageUrl()
                                      | AudioUrl()
                                      | DocumentUrl()
                                      | VideoUrl()
                                      | BinaryContent()
                                      | str(),
                                  ) as content
                              ):
                                  user_content.extend(content)
                              case BaseToolReturnPart(
                                  content=(
                                      str()
                                      | ImageUrl()
                                      | AudioUrl()
                                      | DocumentUrl()
                                      | VideoUrl()
                                      | BinaryContent()
                                  ) as content
                              ):
                                  user_content.append(content)
                              case BaseToolReturnPart():
                                  # Tool return parts become user content strings
                                  user_content.append(part.model_response_str())
                              case ToolCallPart():
                                  # Tool return parts become user content strings
                                  user_content.append(part.args_as_json_str())
                              case _:
                                  pass
              
                      # Create new UserPromptPart with converted content
                      if user_content:
                          converted_parts = [UserPromptPart(content=user_content)]
                      else:
                          converted_parts = [UserPromptPart(content=str(self.content))]
              
                      return replace(
                          self,
                          role="user",
                          messages=[ModelRequest(parts=converted_parts)],
                          cost_info=None,
                          # TODO: what about message_id?
                      )
              
                  @property
                  def data(self) -> TContent:
                      """Get content as typed data. Provides compat to AgentRunResult."""
                      return self.content
              
                  def get_tool_calls(
                      self,
                      tools: dict[str, Any] | None = None,
                      agent_name: str | None = None,
                  ) -> list[ToolCallInfo]:
                      """Extract tool call information from all messages lazily.
              
                      Args:
                          tools: Original Tool set to enrich ToolCallInfos with additional info
                          agent_name: Name of the caller
                      """
                      from uuid import uuid4
              
                      from pydantic_ai import ToolReturnPart
              
                      from llmling_agent.tools import ToolCallInfo
              
                      tools = tools or {}
                      parts = [part for message in self.messages for part in message.parts]
                      call_parts = {
                          part.tool_call_id: part
                          for part in parts
                          if isinstance(part, ToolCallPart) and part.tool_call_id
                      }
              
                      tool_calls = []
                      for part in parts:
                          if isinstance(part, ToolReturnPart) and part.tool_call_id in call_parts:
                              call_part = call_parts[part.tool_call_id]
                              tool_info = ToolCallInfo(
                                  tool_name=call_part.tool_name,
                                  args=call_part.args_as_dict(),
                                  agent_name=agent_name or "UNSET",
                                  result=part.content,
                                  tool_call_id=call_part.tool_call_id or str(uuid4()),
                                  timestamp=part.timestamp,
                                  agent_tool_name=(
                                      t.agent_name if (t := tools.get(part.tool_name)) else None
                                  ),
                              )
                              tool_calls.append(tool_info)
              
                      return tool_calls
              
                  def format(
                      self,
                      style: FormatStyle = "simple",
                      *,
                      template: str | None = None,
                      variables: dict[str, Any] | None = None,
                      show_metadata: bool = False,
                      show_costs: bool = False,
                  ) -> str:
                      """Format message with configurable style.
              
                      Args:
                          style: Predefined style or "custom" for custom template
                          template: Custom Jinja template (required if style="custom")
                          variables: Additional variables for template rendering
                          show_metadata: Whether to include metadata
                          show_costs: Whether to include cost information
              
                      Raises:
                          ValueError: If style is "custom" but no template provided
                                  or if style is invalid
                      """
                      from jinjarope import Environment
                      import yamling
              
                      env = Environment(trim_blocks=True, lstrip_blocks=True)
                      env.filters["to_yaml"] = yamling.dump_yaml
              
                      match style:
                          case "custom":
                              if not template:
                                  msg = "Custom style requires a template"
                                  raise ValueError(msg)
                              template_str = template
                          case _ if style in MESSAGE_TEMPLATES:
                              template_str = MESSAGE_TEMPLATES[style]
                          case _:
                              msg = f"Invalid style: {style}"
                              raise ValueError(msg)
                      template_obj = env.from_string(template_str)
                      vars_ = {
                          **(self.__dict__),
                          "show_metadata": show_metadata,
                          "show_costs": show_costs,
                      }
              
                      if variables:
                          vars_.update(variables)
              
                      return template_obj.render(**vars_)
              

              associated_messages class-attribute instance-attribute

              associated_messages: list[ChatMessage[Any]] = field(default_factory=list)
              

              List of messages which were generated during the the creation of this messsage.

              content instance-attribute

              content: TContent
              

              Message content, typed as TContent (either str or BaseModel).

              conversation_id class-attribute instance-attribute

              conversation_id: str | None = None
              

              ID of the conversation this message belongs to.

              cost_info class-attribute instance-attribute

              cost_info: TokenCost | None = None
              

              Token usage and costs for this specific message if available.

              data property

              data: TContent
              

              Get content as typed data. Provides compat to AgentRunResult.

              finish_reason class-attribute instance-attribute

              finish_reason: FinishReason | None = None
              

              Reason the model finished generating the response.

              Normalized to OpenTelemetry values.

              forwarded_from class-attribute instance-attribute

              forwarded_from: list[str] = field(default_factory=list)
              

              List of agent names (the chain) that forwarded this message to the sender.

              kind property

              kind: Literal['request', 'response']
              

              Role of the message.

              last_message property

              last_message: ModelMessage
              

              Return the last message from the message history.

              message_id class-attribute instance-attribute

              message_id: str = field(default_factory=lambda: str(uuid4()))
              

              Unique identifier for this message.

              messages class-attribute instance-attribute

              messages: list[ModelMessage] = field(default_factory=list)
              

              List of messages which were generated during the the creation of this messsage.

              metadata class-attribute instance-attribute

              metadata: SimpleJsonType = field(default_factory=dict)
              

              Additional metadata about the message.

              model_name class-attribute instance-attribute

              model_name: str | None = None
              

              The name of the model that generated the response.

              name class-attribute instance-attribute

              name: str | None = None
              

              Display name for the message sender in UI.

              parts property

              parts: Sequence[ModelRequestPart] | Sequence[ModelResponsePart]
              

              The parts of the last model message.

              provider_details class-attribute instance-attribute

              provider_details: dict[str, Any] = field(default_factory=dict)
              

              Provider specific metadata / extra information.

              provider_name class-attribute instance-attribute

              provider_name: str | None = None
              

              The name of the LLM provider that generated the response.

              provider_response_id class-attribute instance-attribute

              provider_response_id: str | None = None
              

              request ID as specified by the model provider.

              This can be used to track the specific request to the model.

              response property

              response: ModelResponse
              

              Return the last response from the message history.

              response_time class-attribute instance-attribute

              response_time: float | None = None
              

              Time it took the LLM to respond.

              role instance-attribute

              role: MessageRole
              

              Role of the message sender (user/assistant).

              timestamp class-attribute instance-attribute

              timestamp: datetime = field(default_factory=get_now)
              

              When this message was created.

              usage class-attribute instance-attribute

              usage: RequestUsage = field(default_factory=RequestUsage)
              

              Usage information for the request.

              This has a default to make tests easier, and to support loading old messages where usage will be missing.

              format

              format(
                  style: FormatStyle = "simple",
                  *,
                  template: str | None = None,
                  variables: dict[str, Any] | None = None,
                  show_metadata: bool = False,
                  show_costs: bool = False,
              ) -> str
              

              Format message with configurable style.

              Parameters:

              Name Type Description Default
              style FormatStyle

              Predefined style or "custom" for custom template

              'simple'
              template str | None

              Custom Jinja template (required if style="custom")

              None
              variables dict[str, Any] | None

              Additional variables for template rendering

              None
              show_metadata bool

              Whether to include metadata

              False
              show_costs bool

              Whether to include cost information

              False

              Raises:

              Type Description
              ValueError

              If style is "custom" but no template provided or if style is invalid

              Source code in src/llmling_agent/messaging/messages.py
              527
              528
              529
              530
              531
              532
              533
              534
              535
              536
              537
              538
              539
              540
              541
              542
              543
              544
              545
              546
              547
              548
              549
              550
              551
              552
              553
              554
              555
              556
              557
              558
              559
              560
              561
              562
              563
              564
              565
              566
              567
              568
              569
              570
              571
              572
              573
              574
              575
              576
              def format(
                  self,
                  style: FormatStyle = "simple",
                  *,
                  template: str | None = None,
                  variables: dict[str, Any] | None = None,
                  show_metadata: bool = False,
                  show_costs: bool = False,
              ) -> str:
                  """Format message with configurable style.
              
                  Args:
                      style: Predefined style or "custom" for custom template
                      template: Custom Jinja template (required if style="custom")
                      variables: Additional variables for template rendering
                      show_metadata: Whether to include metadata
                      show_costs: Whether to include cost information
              
                  Raises:
                      ValueError: If style is "custom" but no template provided
                              or if style is invalid
                  """
                  from jinjarope import Environment
                  import yamling
              
                  env = Environment(trim_blocks=True, lstrip_blocks=True)
                  env.filters["to_yaml"] = yamling.dump_yaml
              
                  match style:
                      case "custom":
                          if not template:
                              msg = "Custom style requires a template"
                              raise ValueError(msg)
                          template_str = template
                      case _ if style in MESSAGE_TEMPLATES:
                          template_str = MESSAGE_TEMPLATES[style]
                      case _:
                          msg = f"Invalid style: {style}"
                          raise ValueError(msg)
                  template_obj = env.from_string(template_str)
                  vars_ = {
                      **(self.__dict__),
                      "show_metadata": show_metadata,
                      "show_costs": show_costs,
                  }
              
                  if variables:
                      vars_.update(variables)
              
                  return template_obj.render(**vars_)
              

              forwarded

              forwarded(previous_message: ChatMessage[Any]) -> Self
              

              Create new message showing it was forwarded from another message.

              Parameters:

              Name Type Description Default
              previous_message ChatMessage[Any]

              The message that led to this one's creation

              required

              Returns:

              Type Description
              Self

              New message with updated chain showing the path through previous message

              Source code in src/llmling_agent/messaging/messages.py
              389
              390
              391
              392
              393
              394
              395
              396
              397
              398
              399
              def forwarded(self, previous_message: ChatMessage[Any]) -> Self:
                  """Create new message showing it was forwarded from another message.
              
                  Args:
                      previous_message: The message that led to this one's creation
              
                  Returns:
                      New message with updated chain showing the path through previous message
                  """
                  from_ = [*previous_message.forwarded_from, previous_message.name or "unknown"]
                  return replace(self, forwarded_from=from_)
              

              from_pydantic_ai classmethod

              from_pydantic_ai(
                  content: TContentType,
                  message: ModelMessage,
                  conversation_id: str | None = None,
                  name: str | None = None,
                  message_id: str | None = None,
                  forwarded_from: list[str] | None = None,
              ) -> ChatMessage[TContentType]
              

              Convert a Pydantic model to a ChatMessage.

              Source code in src/llmling_agent/messaging/messages.py
              292
              293
              294
              295
              296
              297
              298
              299
              300
              301
              302
              303
              304
              305
              306
              307
              308
              309
              310
              311
              312
              313
              314
              315
              316
              317
              318
              319
              320
              321
              322
              323
              324
              325
              326
              327
              328
              329
              330
              331
              332
              333
              334
              335
              336
              337
              338
              339
              340
              341
              @classmethod
              def from_pydantic_ai[TContentType](
                  cls,
                  content: TContentType,
                  message: ModelMessage,
                  conversation_id: str | None = None,
                  name: str | None = None,
                  message_id: str | None = None,
                  forwarded_from: list[str] | None = None,
              ) -> ChatMessage[TContentType]:
                  """Convert a Pydantic model to a ChatMessage."""
                  match message:
                      case ModelRequest(instructions=_instructions):
                          return ChatMessage(
                              messages=[message],
                              content=content,
                              role="user" if message.kind == "request" else "assistant",
                              message_id=message_id or str(uuid.uuid4()),
                              # instructions=instructions,
                              forwarded_from=forwarded_from or [],
                              name=name,
                          )
                      case ModelResponse(
                          usage=usage,
                          model_name=model_name,
                          timestamp=timestamp,
                          provider_name=provider_name,
                          provider_details=provider_details,
                          finish_reason=finish_reason,
                          provider_response_id=provider_response_id,
                      ):
                          return ChatMessage(
                              role="user" if message.kind == "request" else "assistant",
                              content=content,
                              messages=[message],
                              usage=usage,
                              message_id=message_id or str(uuid.uuid4()),
                              conversation_id=conversation_id,
                              model_name=model_name,
                              timestamp=timestamp,
                              provider_name=provider_name,
                              provider_details=provider_details or {},
                              finish_reason=finish_reason,
                              provider_response_id=provider_response_id,
                              name=name,
                              forwarded_from=forwarded_from or [],
                          )
                      case _:
                          msg = f"Unknown message kind: {message.kind}"
                          raise ValueError(msg)
              

              from_run_result async classmethod

              from_run_result(
                  result: AgentRunResult[OutputDataT],
                  *,
                  agent_name: str | None = None,
                  message_id: str | None = None,
                  conversation_id: str | None = None,
                  response_time: float,
              ) -> ChatMessage[OutputDataT]
              

              Create a ChatMessage from a PydanticAI run result.

              Parameters:

              Name Type Description Default
              result AgentRunResult[OutputDataT]

              The PydanticAI run result

              required
              agent_name str | None

              Name of the agent that generated this response

              None
              message_id str | None

              Unique message identifier

              None
              conversation_id str | None

              Conversation identifier

              None
              response_time float

              Total time taken for the response

              required

              Returns:

              Type Description
              ChatMessage[OutputDataT]

              A ChatMessage with all fields populated from the result

              Source code in src/llmling_agent/messaging/messages.py
              343
              344
              345
              346
              347
              348
              349
              350
              351
              352
              353
              354
              355
              356
              357
              358
              359
              360
              361
              362
              363
              364
              365
              366
              367
              368
              369
              370
              371
              372
              373
              374
              375
              376
              377
              378
              379
              380
              381
              382
              383
              384
              385
              386
              387
              @classmethod
              async def from_run_result[OutputDataT](
                  cls,
                  result: AgentRunResult[OutputDataT],
                  *,
                  agent_name: str | None = None,
                  message_id: str | None = None,
                  conversation_id: str | None = None,
                  response_time: float,
              ) -> ChatMessage[OutputDataT]:
                  """Create a ChatMessage from a PydanticAI run result.
              
                  Args:
                      result: The PydanticAI run result
                      agent_name: Name of the agent that generated this response
                      message_id: Unique message identifier
                      conversation_id: Conversation identifier
                      response_time: Total time taken for the response
              
                  Returns:
                      A ChatMessage with all fields populated from the result
                  """
                  # Calculate costs
                  run_usage = result.usage()
                  usage = result.response.usage
                  cost_info = await TokenCost.from_usage(
                      model=result.response.model_name or "", usage=run_usage
                  )
              
                  return ChatMessage[OutputDataT](
                      content=result.output,
                      role="assistant",
                      name=agent_name,
                      model_name=result.response.model_name,
                      finish_reason=result.response.finish_reason,
                      messages=result.new_messages(),
                      provider_response_id=result.response.provider_response_id,
                      usage=usage,
                      provider_name=result.response.provider_name,
                      message_id=message_id or str(uuid4()),
                      conversation_id=conversation_id,
                      cost_info=cost_info,
                      response_time=response_time,
                      provider_details={},
                  )
              

              get_tool_calls

              get_tool_calls(
                  tools: dict[str, Any] | None = None, agent_name: str | None = None
              ) -> list[ToolCallInfo]
              

              Extract tool call information from all messages lazily.

              Parameters:

              Name Type Description Default
              tools dict[str, Any] | None

              Original Tool set to enrich ToolCallInfos with additional info

              None
              agent_name str | None

              Name of the caller

              None
              Source code in src/llmling_agent/messaging/messages.py
              483
              484
              485
              486
              487
              488
              489
              490
              491
              492
              493
              494
              495
              496
              497
              498
              499
              500
              501
              502
              503
              504
              505
              506
              507
              508
              509
              510
              511
              512
              513
              514
              515
              516
              517
              518
              519
              520
              521
              522
              523
              524
              525
              def get_tool_calls(
                  self,
                  tools: dict[str, Any] | None = None,
                  agent_name: str | None = None,
              ) -> list[ToolCallInfo]:
                  """Extract tool call information from all messages lazily.
              
                  Args:
                      tools: Original Tool set to enrich ToolCallInfos with additional info
                      agent_name: Name of the caller
                  """
                  from uuid import uuid4
              
                  from pydantic_ai import ToolReturnPart
              
                  from llmling_agent.tools import ToolCallInfo
              
                  tools = tools or {}
                  parts = [part for message in self.messages for part in message.parts]
                  call_parts = {
                      part.tool_call_id: part
                      for part in parts
                      if isinstance(part, ToolCallPart) and part.tool_call_id
                  }
              
                  tool_calls = []
                  for part in parts:
                      if isinstance(part, ToolReturnPart) and part.tool_call_id in call_parts:
                          call_part = call_parts[part.tool_call_id]
                          tool_info = ToolCallInfo(
                              tool_name=call_part.tool_name,
                              args=call_part.args_as_dict(),
                              agent_name=agent_name or "UNSET",
                              result=part.content,
                              tool_call_id=call_part.tool_call_id or str(uuid4()),
                              timestamp=part.timestamp,
                              agent_tool_name=(
                                  t.agent_name if (t := tools.get(part.tool_name)) else None
                              ),
                          )
                          tool_calls.append(tool_info)
              
                  return tool_calls
              

              to_pydantic_ai

              to_pydantic_ai() -> Sequence[ModelMessage]
              

              Convert this message to a Pydantic model.

              Source code in src/llmling_agent/messaging/messages.py
              254
              255
              256
              257
              258
              259
              260
              261
              262
              263
              264
              265
              266
              267
              268
              269
              270
              271
              272
              273
              def to_pydantic_ai(self) -> Sequence[ModelMessage]:
                  """Convert this message to a Pydantic model."""
                  if self.messages:
                      return self.messages
                  match self.kind:
                      case "request":
                          return [ModelRequest(parts=self.parts, instructions=None)]  # type: ignore
                      case "response":
                          return [
                              ModelResponse(
                                  parts=self.parts,  # type: ignore
                                  usage=self.usage,
                                  model_name=self.model_name,
                                  timestamp=self.timestamp,
                                  provider_name=self.provider_name,
                                  provider_details=self.provider_details,
                                  finish_reason=self.finish_reason,
                                  provider_response_id=self.provider_response_id,
                              )
                          ]
              

              to_request

              to_request() -> Self
              

              Convert this message to a request message.

              If the message is already a request (user role), this is a no-op. If it's a response (assistant role), converts response parts to user content.

              Returns:

              Type Description
              Self

              New ChatMessage with role='user' and converted parts

              Source code in src/llmling_agent/messaging/messages.py
              412
              413
              414
              415
              416
              417
              418
              419
              420
              421
              422
              423
              424
              425
              426
              427
              428
              429
              430
              431
              432
              433
              434
              435
              436
              437
              438
              439
              440
              441
              442
              443
              444
              445
              446
              447
              448
              449
              450
              451
              452
              453
              454
              455
              456
              457
              458
              459
              460
              461
              462
              463
              464
              465
              466
              467
              468
              469
              470
              471
              472
              473
              474
              475
              476
              def to_request(self) -> Self:
                  """Convert this message to a request message.
              
                  If the message is already a request (user role), this is a no-op.
                  If it's a response (assistant role), converts response parts to user content.
              
                  Returns:
                      New ChatMessage with role='user' and converted parts
                  """
                  if self.role == "user":
                      return self  # Already a request, return as-is
              
                  user_content: list[UserContent] = []  # Convert response parts to user content
                  for part in self.parts:
                      match part:
                          case TextPart(content=text_content):
                              # Text parts become user content strings
                              user_content.append(text_content)
                          case FilePart(content=binary_content):
                              # File parts (images, etc.) become user content directly
                              user_content.append(binary_content)
                          case BaseToolReturnPart(
                              content=(
                                  ImageUrl()
                                  | AudioUrl()
                                  | DocumentUrl()
                                  | VideoUrl()
                                  | BinaryContent()
                                  | str(),
                              ) as content
                          ):
                              user_content.extend(content)
                          case BaseToolReturnPart(
                              content=(
                                  str()
                                  | ImageUrl()
                                  | AudioUrl()
                                  | DocumentUrl()
                                  | VideoUrl()
                                  | BinaryContent()
                              ) as content
                          ):
                              user_content.append(content)
                          case BaseToolReturnPart():
                              # Tool return parts become user content strings
                              user_content.append(part.model_response_str())
                          case ToolCallPart():
                              # Tool return parts become user content strings
                              user_content.append(part.args_as_json_str())
                          case _:
                              pass
              
                  # Create new UserPromptPart with converted content
                  if user_content:
                      converted_parts = [UserPromptPart(content=user_content)]
                  else:
                      converted_parts = [UserPromptPart(content=str(self.content))]
              
                  return replace(
                      self,
                      role="user",
                      messages=[ModelRequest(parts=converted_parts)],
                      cost_info=None,
                      # TODO: what about message_id?
                  )
              

              user_prompt classmethod

              user_prompt(
                  message: TPromptContent,
                  conversation_id: str | None = None,
                  instructions: str | None = None,
              ) -> ChatMessage[TPromptContent]
              

              Create a user prompt message.

              Source code in src/llmling_agent/messaging/messages.py
              275
              276
              277
              278
              279
              280
              281
              282
              283
              284
              285
              286
              287
              288
              289
              290
              @classmethod
              def user_prompt[TPromptContent: str | Sequence[UserContent] = str](
                  cls,
                  message: TPromptContent,
                  conversation_id: str | None = None,
                  instructions: str | None = None,
              ) -> ChatMessage[TPromptContent]:
                  """Create a user prompt message."""
                  part = UserPromptPart(content=message)
                  request = ModelRequest(parts=[part], instructions=instructions)
                  return ChatMessage(
                      messages=[request],
                      role="user",
                      content=message,
                      conversation_id=conversation_id or str(uuid4()),
                  )
              

              ChatMessageContainer

              Bases: EventedList[ChatMessage[Any]]

              Container for tracking and managing chat messages.

              Extends EventedList to provide: - Message statistics (tokens, costs) - History formatting - Token-aware context window management - Role-based filtering

              Source code in src/llmling_agent/messaging/message_container.py
               28
               29
               30
               31
               32
               33
               34
               35
               36
               37
               38
               39
               40
               41
               42
               43
               44
               45
               46
               47
               48
               49
               50
               51
               52
               53
               54
               55
               56
               57
               58
               59
               60
               61
               62
               63
               64
               65
               66
               67
               68
               69
               70
               71
               72
               73
               74
               75
               76
               77
               78
               79
               80
               81
               82
               83
               84
               85
               86
               87
               88
               89
               90
               91
               92
               93
               94
               95
               96
               97
               98
               99
              100
              101
              102
              103
              104
              105
              106
              107
              108
              109
              110
              111
              112
              113
              114
              115
              116
              117
              118
              119
              120
              121
              122
              123
              124
              125
              126
              127
              128
              129
              130
              131
              132
              133
              134
              135
              136
              137
              138
              139
              140
              141
              142
              143
              144
              145
              146
              147
              148
              149
              150
              151
              152
              153
              154
              155
              156
              157
              158
              159
              160
              161
              162
              163
              164
              165
              166
              167
              168
              169
              170
              171
              172
              173
              174
              175
              176
              177
              178
              179
              180
              181
              182
              183
              184
              185
              186
              187
              188
              189
              190
              191
              192
              193
              194
              195
              196
              197
              198
              199
              200
              201
              202
              203
              204
              205
              206
              207
              208
              209
              210
              211
              212
              213
              214
              215
              216
              class ChatMessageContainer(EventedList[ChatMessage[Any]]):
                  """Container for tracking and managing chat messages.
              
                  Extends EventedList to provide:
                  - Message statistics (tokens, costs)
                  - History formatting
                  - Token-aware context window management
                  - Role-based filtering
                  """
              
                  def get_message_tokens(self, message: ChatMessage[Any]) -> int:
                      """Get token count for a single message.
              
                      Uses cost_info if available, falls back to tiktoken estimation.
              
                      Args:
                          message: Message to count tokens for
              
                      Returns:
                          Token count for the message
                      """
                      if message.cost_info:
                          return message.cost_info.token_usage.total_tokens
                      return count_tokens(str(message.usage.total_tokens), message.model_name)
              
                  def get_history_tokens(self, fallback_model: str | None = None) -> int:
                      """Get total token count for all messages.
              
                      Uses cost_info when available, falls back to tiktoken estimation
                      for messages without usage information.
              
                      Returns:
                          Total token count across all messages
                      """
                      # Use cost_info if available
                      total = sum(m.cost_info.token_usage.total_tokens for m in self if m.cost_info)
              
                      # For messages without cost_info, estimate using tiktoken
                      if msgs := [msg for msg in self if not msg.cost_info]:
                          if fallback_model:
                              model_name = fallback_model
                          else:
                              model_name = next(
                                  (m.model_name for m in self if m.model_name), DEFAULT_TOKEN_MODEL
                              )
                          contents = [str(msg.content) for msg in msgs]
                          total += sum(batch_count_tokens(contents, model_name))
              
                      return total
              
                  def get_total_cost(self) -> float:
                      """Calculate total cost in USD across all messages.
              
                      Only includes messages with cost information.
              
                      Returns:
                          Total cost in USD
                      """
                      return sum(float(msg.cost_info.total_cost) for msg in self if msg.cost_info)
              
                  @property
                  def last_message(self) -> ChatMessage[Any] | None:
                      """Get most recent message or None if empty."""
                      return self[-1] if self else None
              
                  def format(self, *, style: FormatStyle = "simple", **kwargs: Any) -> str:
                      """Format conversation history with configurable style.
              
                      Args:
                          style: Formatting style to use
                          **kwargs: Additional formatting options passed to message.format()
              
                      Returns:
                          Formatted conversation history as string
                      """
                      return "\n".join(msg.format(style=style, **kwargs) for msg in self)
              
                  def filter_by_role(
                      self,
                      role: MessageRole,
                      *,
                      max_messages: int | None = None,
                  ) -> list[ChatMessage[Any]]:
                      """Get messages with specific role.
              
                      Args:
                          role: Role to filter by (user/assistant/system)
                          max_messages: Optional limit on number of messages to return
              
                      Returns:
                          List of messages with matching role
                      """
                      messages = [msg for msg in self if msg.role == role]
                      if max_messages:
                          messages = messages[-max_messages:]
                      return messages
              
                  def get_between(
                      self,
                      *,
                      start_time: datetime | None = None,
                      end_time: datetime | None = None,
                  ) -> list[ChatMessage[Any]]:
                      """Get messages within a time range.
              
                      Args:
                          start_time: Optional start of range
                          end_time: Optional end of range
              
                      Returns:
                          List of messages within the time range
                      """
                      messages = list(self)
                      if start_time:
                          messages = [msg for msg in messages if msg.timestamp >= start_time]
                      if end_time:
                          messages = [msg for msg in messages if msg.timestamp <= end_time]
                      return messages
              
                  def _build_flow_dag(self, message: ChatMessage[Any]) -> DAGNode | None:
                      """Build DAG from message flow.
              
                      Args:
                          message: Message to build flow DAG for
              
                      Returns:
                          Root DAGNode of the graph
                      """
                      from bigtree import DAGNode
              
                      # Get messages from this conversation
                      conv_messages = [m for m in self if m.conversation_id == message.conversation_id]
                      nodes: dict[str, DAGNode] = {}
                      for msg in conv_messages:  # First create all nodes
                          if msg.forwarded_from:
                              chain = [*msg.forwarded_from, msg.name or "unknown"]
                              for name in chain:
                                  if name not in nodes:
                                      nodes[name] = DAGNode(name)
              
                      # Then set up parent relationships
                      for msg in conv_messages:
                          if msg.forwarded_from:
                              chain = [*msg.forwarded_from, msg.name or "unknown"]
                              # Connect consecutive nodes
                              for parent_name, child_name in itertools.pairwise(chain):
                                  parent = nodes[parent_name]
                                  child = nodes[child_name]
                                  if parent not in child.parents:
                                      child.parents = [*child.parents, parent]
              
                      # Find root nodes (those without parents)
                      roots = [node for node in nodes.values() if not node.parents]
                      if not roots:
                          return None
                      return roots[0]  # Return first root for now
              
                  def to_mermaid_graph(
                      self,
                      message: ChatMessage[Any],
                      *,
                      title: str = "",
                      theme: str | None = None,
                      rankdir: Literal["TB", "BT", "LR", "RL"] = "LR",
                  ) -> str:
                      """Convert message flow to mermaid graph."""
                      from bigtree import dag_to_list
              
                      dag = self._build_flow_dag(message)
                      if not dag:
                          return ""
              
                      # Get list of connections
                      connections = dag_to_list(dag)
              
                      # Convert to mermaid
                      lines = ["```mermaid"]
                      if title:
                          lines.extend(["---", f"title: {title}", "---"])
                      if theme:
                          lines.append(f'%%{{ init: {{ "theme": "{theme}" }} }}%%')
                      lines.append(f"flowchart {rankdir}")
              
                      # Add connections
                      for parent, child in connections:
                          lines.append(f"    {parent}-->{child}")
              
                      lines.append("```")
                      return "\n".join(lines)
              

              last_message property

              last_message: ChatMessage[Any] | None
              

              Get most recent message or None if empty.

              filter_by_role

              filter_by_role(
                  role: MessageRole, *, max_messages: int | None = None
              ) -> list[ChatMessage[Any]]
              

              Get messages with specific role.

              Parameters:

              Name Type Description Default
              role MessageRole

              Role to filter by (user/assistant/system)

              required
              max_messages int | None

              Optional limit on number of messages to return

              None

              Returns:

              Type Description
              list[ChatMessage[Any]]

              List of messages with matching role

              Source code in src/llmling_agent/messaging/message_container.py
              105
              106
              107
              108
              109
              110
              111
              112
              113
              114
              115
              116
              117
              118
              119
              120
              121
              122
              123
              def filter_by_role(
                  self,
                  role: MessageRole,
                  *,
                  max_messages: int | None = None,
              ) -> list[ChatMessage[Any]]:
                  """Get messages with specific role.
              
                  Args:
                      role: Role to filter by (user/assistant/system)
                      max_messages: Optional limit on number of messages to return
              
                  Returns:
                      List of messages with matching role
                  """
                  messages = [msg for msg in self if msg.role == role]
                  if max_messages:
                      messages = messages[-max_messages:]
                  return messages
              

              format

              format(*, style: FormatStyle = 'simple', **kwargs: Any) -> str
              

              Format conversation history with configurable style.

              Parameters:

              Name Type Description Default
              style FormatStyle

              Formatting style to use

              'simple'
              **kwargs Any

              Additional formatting options passed to message.format()

              {}

              Returns:

              Type Description
              str

              Formatted conversation history as string

              Source code in src/llmling_agent/messaging/message_container.py
               93
               94
               95
               96
               97
               98
               99
              100
              101
              102
              103
              def format(self, *, style: FormatStyle = "simple", **kwargs: Any) -> str:
                  """Format conversation history with configurable style.
              
                  Args:
                      style: Formatting style to use
                      **kwargs: Additional formatting options passed to message.format()
              
                  Returns:
                      Formatted conversation history as string
                  """
                  return "\n".join(msg.format(style=style, **kwargs) for msg in self)
              

              get_between

              get_between(
                  *, start_time: datetime | None = None, end_time: datetime | None = None
              ) -> list[ChatMessage[Any]]
              

              Get messages within a time range.

              Parameters:

              Name Type Description Default
              start_time datetime | None

              Optional start of range

              None
              end_time datetime | None

              Optional end of range

              None

              Returns:

              Type Description
              list[ChatMessage[Any]]

              List of messages within the time range

              Source code in src/llmling_agent/messaging/message_container.py
              125
              126
              127
              128
              129
              130
              131
              132
              133
              134
              135
              136
              137
              138
              139
              140
              141
              142
              143
              144
              145
              def get_between(
                  self,
                  *,
                  start_time: datetime | None = None,
                  end_time: datetime | None = None,
              ) -> list[ChatMessage[Any]]:
                  """Get messages within a time range.
              
                  Args:
                      start_time: Optional start of range
                      end_time: Optional end of range
              
                  Returns:
                      List of messages within the time range
                  """
                  messages = list(self)
                  if start_time:
                      messages = [msg for msg in messages if msg.timestamp >= start_time]
                  if end_time:
                      messages = [msg for msg in messages if msg.timestamp <= end_time]
                  return messages
              

              get_history_tokens

              get_history_tokens(fallback_model: str | None = None) -> int
              

              Get total token count for all messages.

              Uses cost_info when available, falls back to tiktoken estimation for messages without usage information.

              Returns:

              Type Description
              int

              Total token count across all messages

              Source code in src/llmling_agent/messaging/message_container.py
              53
              54
              55
              56
              57
              58
              59
              60
              61
              62
              63
              64
              65
              66
              67
              68
              69
              70
              71
              72
              73
              74
              75
              76
              def get_history_tokens(self, fallback_model: str | None = None) -> int:
                  """Get total token count for all messages.
              
                  Uses cost_info when available, falls back to tiktoken estimation
                  for messages without usage information.
              
                  Returns:
                      Total token count across all messages
                  """
                  # Use cost_info if available
                  total = sum(m.cost_info.token_usage.total_tokens for m in self if m.cost_info)
              
                  # For messages without cost_info, estimate using tiktoken
                  if msgs := [msg for msg in self if not msg.cost_info]:
                      if fallback_model:
                          model_name = fallback_model
                      else:
                          model_name = next(
                              (m.model_name for m in self if m.model_name), DEFAULT_TOKEN_MODEL
                          )
                      contents = [str(msg.content) for msg in msgs]
                      total += sum(batch_count_tokens(contents, model_name))
              
                  return total
              

              get_message_tokens

              get_message_tokens(message: ChatMessage[Any]) -> int
              

              Get token count for a single message.

              Uses cost_info if available, falls back to tiktoken estimation.

              Parameters:

              Name Type Description Default
              message ChatMessage[Any]

              Message to count tokens for

              required

              Returns:

              Type Description
              int

              Token count for the message

              Source code in src/llmling_agent/messaging/message_container.py
              38
              39
              40
              41
              42
              43
              44
              45
              46
              47
              48
              49
              50
              51
              def get_message_tokens(self, message: ChatMessage[Any]) -> int:
                  """Get token count for a single message.
              
                  Uses cost_info if available, falls back to tiktoken estimation.
              
                  Args:
                      message: Message to count tokens for
              
                  Returns:
                      Token count for the message
                  """
                  if message.cost_info:
                      return message.cost_info.token_usage.total_tokens
                  return count_tokens(str(message.usage.total_tokens), message.model_name)
              

              get_total_cost

              get_total_cost() -> float
              

              Calculate total cost in USD across all messages.

              Only includes messages with cost information.

              Returns:

              Type Description
              float

              Total cost in USD

              Source code in src/llmling_agent/messaging/message_container.py
              78
              79
              80
              81
              82
              83
              84
              85
              86
              def get_total_cost(self) -> float:
                  """Calculate total cost in USD across all messages.
              
                  Only includes messages with cost information.
              
                  Returns:
                      Total cost in USD
                  """
                  return sum(float(msg.cost_info.total_cost) for msg in self if msg.cost_info)
              

              to_mermaid_graph

              to_mermaid_graph(
                  message: ChatMessage[Any],
                  *,
                  title: str = "",
                  theme: str | None = None,
                  rankdir: Literal["TB", "BT", "LR", "RL"] = "LR",
              ) -> str
              

              Convert message flow to mermaid graph.

              Source code in src/llmling_agent/messaging/message_container.py
              185
              186
              187
              188
              189
              190
              191
              192
              193
              194
              195
              196
              197
              198
              199
              200
              201
              202
              203
              204
              205
              206
              207
              208
              209
              210
              211
              212
              213
              214
              215
              216
              def to_mermaid_graph(
                  self,
                  message: ChatMessage[Any],
                  *,
                  title: str = "",
                  theme: str | None = None,
                  rankdir: Literal["TB", "BT", "LR", "RL"] = "LR",
              ) -> str:
                  """Convert message flow to mermaid graph."""
                  from bigtree import dag_to_list
              
                  dag = self._build_flow_dag(message)
                  if not dag:
                      return ""
              
                  # Get list of connections
                  connections = dag_to_list(dag)
              
                  # Convert to mermaid
                  lines = ["```mermaid"]
                  if title:
                      lines.extend(["---", f"title: {title}", "---"])
                  if theme:
                      lines.append(f'%%{{ init: {{ "theme": "{theme}" }} }}%%')
                  lines.append(f"flowchart {rankdir}")
              
                  # Add connections
                  for parent, child in connections:
                      lines.append(f"    {parent}-->{child}")
              
                  lines.append("```")
                  return "\n".join(lines)
              

              EventManager

              Manages multiple event sources and their lifecycles.

              Source code in src/llmling_agent/messaging/event_manager.py
               46
               47
               48
               49
               50
               51
               52
               53
               54
               55
               56
               57
               58
               59
               60
               61
               62
               63
               64
               65
               66
               67
               68
               69
               70
               71
               72
               73
               74
               75
               76
               77
               78
               79
               80
               81
               82
               83
               84
               85
               86
               87
               88
               89
               90
               91
               92
               93
               94
               95
               96
               97
               98
               99
              100
              101
              102
              103
              104
              105
              106
              107
              108
              109
              110
              111
              112
              113
              114
              115
              116
              117
              118
              119
              120
              121
              122
              123
              124
              125
              126
              127
              128
              129
              130
              131
              132
              133
              134
              135
              136
              137
              138
              139
              140
              141
              142
              143
              144
              145
              146
              147
              148
              149
              150
              151
              152
              153
              154
              155
              156
              157
              158
              159
              160
              161
              162
              163
              164
              165
              166
              167
              168
              169
              170
              171
              172
              173
              174
              175
              176
              177
              178
              179
              180
              181
              182
              183
              184
              185
              186
              187
              188
              189
              190
              191
              192
              193
              194
              195
              196
              197
              198
              199
              200
              201
              202
              203
              204
              205
              206
              207
              208
              209
              210
              211
              212
              213
              214
              215
              216
              217
              218
              219
              220
              221
              222
              223
              224
              225
              226
              227
              228
              229
              230
              231
              232
              233
              234
              235
              236
              237
              238
              239
              240
              241
              242
              243
              244
              245
              246
              247
              248
              249
              250
              251
              252
              253
              254
              255
              256
              257
              258
              259
              260
              261
              262
              263
              264
              265
              266
              267
              268
              269
              270
              271
              272
              273
              274
              275
              276
              277
              278
              279
              280
              281
              282
              283
              284
              285
              286
              287
              288
              289
              290
              291
              292
              293
              294
              295
              296
              297
              298
              299
              300
              301
              302
              303
              304
              305
              306
              307
              308
              309
              310
              311
              312
              313
              314
              315
              316
              317
              318
              319
              320
              321
              322
              323
              324
              325
              326
              327
              328
              329
              330
              331
              332
              333
              334
              335
              336
              337
              338
              339
              340
              341
              342
              343
              344
              345
              346
              347
              348
              349
              350
              351
              352
              353
              354
              355
              356
              357
              358
              359
              360
              361
              362
              363
              364
              365
              366
              367
              368
              369
              370
              371
              372
              373
              374
              375
              376
              377
              378
              379
              380
              381
              382
              383
              384
              385
              386
              387
              388
              389
              390
              391
              392
              393
              394
              395
              396
              397
              398
              399
              400
              401
              402
              403
              404
              405
              406
              407
              408
              409
              410
              411
              412
              413
              414
              415
              416
              417
              418
              419
              420
              421
              422
              423
              424
              425
              426
              427
              428
              429
              430
              431
              432
              433
              434
              435
              436
              437
              438
              439
              440
              441
              442
              443
              444
              445
              446
              447
              448
              449
              450
              451
              452
              453
              454
              455
              456
              457
              458
              459
              class EventManager:
                  """Manages multiple event sources and their lifecycles."""
              
                  event_processed = Signal(EventData)
              
                  def __init__(
                      self,
                      node: MessageEmitter[Any, Any],
                      enable_events: bool = True,
                      auto_run: bool = True,
                  ):
                      """Initialize event manager.
              
                      Args:
                          node: Agent to manage events for
                          enable_events: Whether to enable event processing
                          auto_run: Whether to automatically call run() for event callbacks
                      """
                      self.node = node
                      self.enabled = enable_events
                      self._sources: dict[str, EventSource] = {}
                      self._callbacks: list[EventCallback] = []
                      self.auto_run = auto_run
                      self._observers = defaultdict[str, list[EventObserver]](list)
              
                  async def _default_handler(self, event: EventData):
                      """Default event handler that converts events to node runs."""
                      if prompt := event.to_prompt():  # Only run if event provides a prompt
                          await self.node.run(prompt)
              
                  def add_callback(self, callback: EventCallback):
                      """Register an event callback."""
                      self._callbacks.append(callback)
              
                  def remove_callback(self, callback: EventCallback):
                      """Remove a previously registered callback."""
                      self._callbacks.remove(callback)
              
                  async def emit_event(self, event: EventData):
                      """Emit event to all callbacks and optionally handle via node."""
                      if not self.enabled:
                          return
              
                      # Run custom callbacks
                      for callback in self._callbacks:
                          try:
                              result = callback(event)
                              if isinstance(result, Awaitable):
                                  await result
                          except Exception:
                              logger.exception("Error in event callback", name=callback.__name__)
              
                      # Run default handler if enabled
                      if self.auto_run:
                          try:
                              prompt = event.to_prompt()
                              if prompt:
                                  await self.node.run(prompt)
                          except Exception:
                              logger.exception("Error in default event handler")
                      self.event_processed.emit(event)
              
                  async def add_file_watch(
                      self,
                      paths: str | Sequence[str],
                      *,
                      name: str | None = None,
                      extensions: list[str] | None = None,
                      ignore_paths: list[str] | None = None,
                      recursive: bool = True,
                      debounce: int = 1600,
                  ) -> EventSource:
                      """Add file system watch event source.
              
                      Args:
                          paths: Paths or patterns to watch
                          name: Optional source name (default: generated from paths)
                          extensions: File extensions to monitor
                          ignore_paths: Paths to ignore
                          recursive: Whether to watch subdirectories
                          debounce: Minimum time between events (ms)
                      """
                      path_list = [paths] if isinstance(paths, str) else list(paths)
                      config = FileWatchConfig(
                          name=name or f"file_watch_{len(self._sources)}",
                          paths=path_list,
                          extensions=extensions,
                          ignore_paths=ignore_paths,
                          recursive=recursive,
                          debounce=debounce,
                      )
                      return await self.add_source(config)
              
                  async def add_webhook(
                      self,
                      path: str,
                      *,
                      name: str | None = None,
                      port: int = 8000,
                      secret: str | None = None,
                  ) -> EventSource:
                      """Add webhook event source.
              
                      Args:
                          path: URL path to listen on
                          name: Optional source name
                          port: Port to listen on
                          secret: Optional secret for request validation
                      """
                      name = name or f"webhook_{len(self._sources)}"
                      sec = SecretStr(secret) if secret else None
                      config = WebhookConfig(name=name, path=path, port=port, secret=sec)
                      return await self.add_source(config)
              
                  async def add_timed_event(
                      self,
                      schedule: str,
                      prompt: str,
                      *,
                      name: str | None = None,
                      timezone: str | None = None,
                      skip_missed: bool = False,
                  ) -> TimeEventSource:
                      """Add time-based event source.
              
                      Args:
                          schedule: Cron expression (e.g. "0 9 * * 1-5" for weekdays at 9am)
                          prompt: Prompt to send when triggered
                          name: Optional source name
                          timezone: Optional timezone (system default if None)
                          skip_missed: Whether to skip missed executions
                      """
                      config = TimeEventConfig(
                          name=name or f"timed_{len(self._sources)}",
                          schedule=schedule,
                          prompt=prompt,
                          timezone=timezone,
                          skip_missed=skip_missed,
                      )
                      return await self.add_source(config)  # type: ignore
              
                  async def add_email_watch(
                      self,
                      host: str,
                      username: str,
                      password: str,
                      *,
                      name: str | None = None,
                      port: int = 993,
                      folder: str = "INBOX",
                      ssl: bool = True,
                      check_interval: int = 60,
                      mark_seen: bool = True,
                      filters: dict[str, str] | None = None,
                      max_size: int | None = None,
                  ) -> EventSource:
                      """Add email monitoring event source.
              
                      Args:
                          host: IMAP server hostname
                          username: Email account username
                          password: Account password or app password
                          name: Optional source name
                          port: Server port (default: 993 for IMAP SSL)
                          folder: Mailbox to monitor
                          ssl: Whether to use SSL/TLS
                          check_interval: Seconds between checks
                          mark_seen: Whether to mark processed emails as seen
                          filters: Optional email filtering criteria
                          max_size: Maximum email size in bytes
                      """
                      config = EmailConfig(
                          name=name or f"email_{len(self._sources)}",
                          host=host,
                          username=username,
                          password=SecretStr(password),
                          port=port,
                          folder=folder,
                          ssl=ssl,
                          check_interval=check_interval,
                          mark_seen=mark_seen,
                          filters=filters or {},
                          max_size=max_size,
                      )
                      return await self.add_source(config)
              
                  async def add_source(self, config: EventConfig) -> EventSource:
                      """Add and start a new event source.
              
                      Args:
                          config: Event source configuration
              
                      Raises:
                          ValueError: If source already exists or is invalid
                      """
                      logger.debug("Setting up event source", name=config.name, type=config.type)
                      from evented.base import EventSource
              
                      if config.name in self._sources:
                          msg = f"Event source already exists: {config.name}"
                          raise ValueError(msg)
              
                      try:
                          source = EventSource.from_config(config)
                          await source.__aenter__()
                          self._sources[config.name] = source
                          # Start processing events
                          name = f"event_processor_{config.name}"
                          self.node.task_manager.create_task(self._process_events(source), name=name)
                          logger.debug("Added event source", name=config.name)
                      except Exception as e:
                          msg = "Failed to add event source"
                          logger.exception(msg, name=config.name)
                          raise RuntimeError(msg) from e
                      else:
                          return source
              
                  async def remove_source(self, name: str):
                      """Stop and remove an event source.
              
                      Args:
                          name: Name of source to remove
                      """
                      if source := self._sources.pop(name, None):
                          await source.__aexit__(None, None, None)
                          logger.debug("Removed event source", name=name)
              
                  async def _process_events(self, source: EventSource):
                      """Process events from a source.
              
                      Args:
                          source: Event source to process
                      """
                      try:
                          # Get the async iterator from the coroutine
                          async for event in source.events():
                              if not self.enabled:
                                  logger.debug("Event processing disabled, skipping event")
                                  continue
                              await self.emit_event(event)
              
                      except asyncio.CancelledError:
                          logger.debug("Event processing cancelled")
                          raise
              
                      except Exception:
                          logger.exception("Error processing events")
              
                  async def cleanup(self):
                      """Clean up all event sources and tasks."""
                      self.enabled = False
              
                      for name in list(self._sources):
                          await self.remove_source(name)
              
                  async def __aenter__(self) -> Self:
                      """Allow using manager as async context manager."""
                      if not self.enabled:
                          return self
              
                      # Set up triggers from config
                      if (
                          self.node.context
                          and self.node.context.config
                          and self.node.context.config.triggers
                      ):
                          for trigger in self.node.context.config.triggers:
                              await self.add_source(trigger)
              
                      return self
              
                  async def __aexit__(
                      self,
                      exc_type: type[BaseException] | None,
                      exc_val: BaseException | None,
                      exc_tb: TracebackType | None,
                  ) -> None:
                      """Clean up when exiting context."""
                      await self.cleanup()
              
                  @overload
                  def track[T](
                      self,
                      event_name: str | None = None,
                      **event_metadata: Any,
                  ) -> Callable[[Callable[..., T]], Callable[..., T]]: ...
              
                  @overload
                  def track[T](
                      self,
                      event_name: str | None = None,
                      **event_metadata: Any,
                  ) -> Callable[
                      [Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]
                  ]: ...
              
                  def track(
                      self,
                      event_name: str | None = None,
                      **event_metadata: Any,
                  ) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
                      """Track function calls as events.
              
                      Args:
                          event_name: Optional name for the event (defaults to function name)
                          **event_metadata: Additional metadata to include with event
              
                      Example:
                          @event_manager.track("user_search")
                          async def search_docs(query: str) -> list[Doc]:
                              results = await search(query)
                              return results  # This result becomes event data
                      """
              
                      def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
                          name = event_name or func.__name__
              
                          @wraps(func)
                          async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                              start_time = get_now()
                              meta = {"args": args, "kwargs": kwargs, **event_metadata}
                              try:
                                  result = await func(*args, **kwargs)
                                  if self.enabled:
                                      meta |= {"status": "success", "duration": get_now() - start_time}
                                      event = EventData.create(name, content=result, metadata=meta)
                                      await self.emit_event(event)
                              except Exception as e:
                                  if self.enabled:
                                      dur = get_now() - start_time
                                      meta |= {"status": "error", "error": str(e), "duration": dur}
                                      event = EventData.create(name, content=str(e), metadata=meta)
                                      await self.emit_event(event)
                                  raise
                              else:
                                  return result
              
                          @wraps(func)
                          def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
                              start_time = get_now()
                              meta = {"args": args, "kwargs": kwargs, **event_metadata}
                              try:
                                  result = func(*args, **kwargs)
                                  if self.enabled:
                                      meta |= {"status": "success", "duration": get_now() - start_time}
                                      event = EventData.create(name, content=result, metadata=meta)
                                      self.node.task_manager.run_background(self.emit_event(event))
                              except Exception as e:
                                  if self.enabled:
                                      dur = get_now() - start_time
                                      meta |= {"status": "error", "error": str(e), "duration": dur}
                                      event = EventData.create(name, content=str(e), metadata=meta)
                                      self.node.task_manager.run_background(self.emit_event(event))
                                  raise
                              else:
                                  return result
              
                          return async_wrapper if inspect.iscoroutinefunction(func) else sync_wrapper
              
                      return decorator
              
                  @overload
                  def poll[T](
                      self,
                      event_type: str,
                      interval: timedelta | None = None,
                  ) -> Callable[[Callable[..., T]], Callable[..., T]]: ...
              
                  @overload
                  def poll[T](
                      self,
                      event_type: str,
                      interval: timedelta | None = None,
                  ) -> Callable[
                      [Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]
                  ]: ...
              
                  def poll(
                      self,
                      event_type: str,
                      interval: timedelta | None = None,
                  ) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
                      """Decorator to register an event observer.
              
                      Args:
                          event_type: Type of event to observe
                          interval: Optional polling interval for periodic checks
              
                      Example:
                          @event_manager.observe("file_changed")
                          async def handle_file_change(event: FileEventData):
                              await process_file(event.path)
                      """
              
                      def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
                          observer = EventObserver(func, interval=interval)
                          self._observers[event_type].append(observer)
              
                          @wraps(func)
                          async def wrapper(*args: Any, **kwargs: Any) -> Any:
                              result = await execute(func, *args, **kwargs)
                              # Convert result to event and emit
                              if self.enabled:
                                  typ = type(result).__name__
                                  meta = {"type": "function_result", "output_type": typ}
                                  event = FunctionResultEventData(
                                      result=result, source=event_type, metadata=meta
                                  )
                                  await self.emit_event(event)
                              return result
              
                          return wrapper
              
                      return decorator
              

              __aenter__ async

              __aenter__() -> Self
              

              Allow using manager as async context manager.

              Source code in src/llmling_agent/messaging/event_manager.py
              301
              302
              303
              304
              305
              306
              307
              308
              309
              310
              311
              312
              313
              314
              315
              async def __aenter__(self) -> Self:
                  """Allow using manager as async context manager."""
                  if not self.enabled:
                      return self
              
                  # Set up triggers from config
                  if (
                      self.node.context
                      and self.node.context.config
                      and self.node.context.config.triggers
                  ):
                      for trigger in self.node.context.config.triggers:
                          await self.add_source(trigger)
              
                  return self
              

              __aexit__ async

              __aexit__(
                  exc_type: type[BaseException] | None,
                  exc_val: BaseException | None,
                  exc_tb: TracebackType | None,
              ) -> None
              

              Clean up when exiting context.

              Source code in src/llmling_agent/messaging/event_manager.py
              317
              318
              319
              320
              321
              322
              323
              324
              async def __aexit__(
                  self,
                  exc_type: type[BaseException] | None,
                  exc_val: BaseException | None,
                  exc_tb: TracebackType | None,
              ) -> None:
                  """Clean up when exiting context."""
                  await self.cleanup()
              

              __init__

              __init__(
                  node: MessageEmitter[Any, Any], enable_events: bool = True, auto_run: bool = True
              )
              

              Initialize event manager.

              Parameters:

              Name Type Description Default
              node MessageEmitter[Any, Any]

              Agent to manage events for

              required
              enable_events bool

              Whether to enable event processing

              True
              auto_run bool

              Whether to automatically call run() for event callbacks

              True
              Source code in src/llmling_agent/messaging/event_manager.py
              51
              52
              53
              54
              55
              56
              57
              58
              59
              60
              61
              62
              63
              64
              65
              66
              67
              68
              69
              def __init__(
                  self,
                  node: MessageEmitter[Any, Any],
                  enable_events: bool = True,
                  auto_run: bool = True,
              ):
                  """Initialize event manager.
              
                  Args:
                      node: Agent to manage events for
                      enable_events: Whether to enable event processing
                      auto_run: Whether to automatically call run() for event callbacks
                  """
                  self.node = node
                  self.enabled = enable_events
                  self._sources: dict[str, EventSource] = {}
                  self._callbacks: list[EventCallback] = []
                  self.auto_run = auto_run
                  self._observers = defaultdict[str, list[EventObserver]](list)
              

              add_callback

              add_callback(callback: EventCallback)
              

              Register an event callback.

              Source code in src/llmling_agent/messaging/event_manager.py
              76
              77
              78
              def add_callback(self, callback: EventCallback):
                  """Register an event callback."""
                  self._callbacks.append(callback)
              

              add_email_watch async

              add_email_watch(
                  host: str,
                  username: str,
                  password: str,
                  *,
                  name: str | None = None,
                  port: int = 993,
                  folder: str = "INBOX",
                  ssl: bool = True,
                  check_interval: int = 60,
                  mark_seen: bool = True,
                  filters: dict[str, str] | None = None,
                  max_size: int | None = None,
              ) -> EventSource
              

              Add email monitoring event source.

              Parameters:

              Name Type Description Default
              host str

              IMAP server hostname

              required
              username str

              Email account username

              required
              password str

              Account password or app password

              required
              name str | None

              Optional source name

              None
              port int

              Server port (default: 993 for IMAP SSL)

              993
              folder str

              Mailbox to monitor

              'INBOX'
              ssl bool

              Whether to use SSL/TLS

              True
              check_interval int

              Seconds between checks

              60
              mark_seen bool

              Whether to mark processed emails as seen

              True
              filters dict[str, str] | None

              Optional email filtering criteria

              None
              max_size int | None

              Maximum email size in bytes

              None
              Source code in src/llmling_agent/messaging/event_manager.py
              187
              188
              189
              190
              191
              192
              193
              194
              195
              196
              197
              198
              199
              200
              201
              202
              203
              204
              205
              206
              207
              208
              209
              210
              211
              212
              213
              214
              215
              216
              217
              218
              219
              220
              221
              222
              223
              224
              225
              226
              227
              228
              229
              230
              async def add_email_watch(
                  self,
                  host: str,
                  username: str,
                  password: str,
                  *,
                  name: str | None = None,
                  port: int = 993,
                  folder: str = "INBOX",
                  ssl: bool = True,
                  check_interval: int = 60,
                  mark_seen: bool = True,
                  filters: dict[str, str] | None = None,
                  max_size: int | None = None,
              ) -> EventSource:
                  """Add email monitoring event source.
              
                  Args:
                      host: IMAP server hostname
                      username: Email account username
                      password: Account password or app password
                      name: Optional source name
                      port: Server port (default: 993 for IMAP SSL)
                      folder: Mailbox to monitor
                      ssl: Whether to use SSL/TLS
                      check_interval: Seconds between checks
                      mark_seen: Whether to mark processed emails as seen
                      filters: Optional email filtering criteria
                      max_size: Maximum email size in bytes
                  """
                  config = EmailConfig(
                      name=name or f"email_{len(self._sources)}",
                      host=host,
                      username=username,
                      password=SecretStr(password),
                      port=port,
                      folder=folder,
                      ssl=ssl,
                      check_interval=check_interval,
                      mark_seen=mark_seen,
                      filters=filters or {},
                      max_size=max_size,
                  )
                  return await self.add_source(config)
              

              add_file_watch async

              add_file_watch(
                  paths: str | Sequence[str],
                  *,
                  name: str | None = None,
                  extensions: list[str] | None = None,
                  ignore_paths: list[str] | None = None,
                  recursive: bool = True,
                  debounce: int = 1600,
              ) -> EventSource
              

              Add file system watch event source.

              Parameters:

              Name Type Description Default
              paths str | Sequence[str]

              Paths or patterns to watch

              required
              name str | None

              Optional source name (default: generated from paths)

              None
              extensions list[str] | None

              File extensions to monitor

              None
              ignore_paths list[str] | None

              Paths to ignore

              None
              recursive bool

              Whether to watch subdirectories

              True
              debounce int

              Minimum time between events (ms)

              1600
              Source code in src/llmling_agent/messaging/event_manager.py
              108
              109
              110
              111
              112
              113
              114
              115
              116
              117
              118
              119
              120
              121
              122
              123
              124
              125
              126
              127
              128
              129
              130
              131
              132
              133
              134
              135
              136
              137
              async def add_file_watch(
                  self,
                  paths: str | Sequence[str],
                  *,
                  name: str | None = None,
                  extensions: list[str] | None = None,
                  ignore_paths: list[str] | None = None,
                  recursive: bool = True,
                  debounce: int = 1600,
              ) -> EventSource:
                  """Add file system watch event source.
              
                  Args:
                      paths: Paths or patterns to watch
                      name: Optional source name (default: generated from paths)
                      extensions: File extensions to monitor
                      ignore_paths: Paths to ignore
                      recursive: Whether to watch subdirectories
                      debounce: Minimum time between events (ms)
                  """
                  path_list = [paths] if isinstance(paths, str) else list(paths)
                  config = FileWatchConfig(
                      name=name or f"file_watch_{len(self._sources)}",
                      paths=path_list,
                      extensions=extensions,
                      ignore_paths=ignore_paths,
                      recursive=recursive,
                      debounce=debounce,
                  )
                  return await self.add_source(config)
              

              add_source async

              add_source(config: EventConfig) -> EventSource
              

              Add and start a new event source.

              Parameters:

              Name Type Description Default
              config EventConfig

              Event source configuration

              required

              Raises:

              Type Description
              ValueError

              If source already exists or is invalid

              Source code in src/llmling_agent/messaging/event_manager.py
              232
              233
              234
              235
              236
              237
              238
              239
              240
              241
              242
              243
              244
              245
              246
              247
              248
              249
              250
              251
              252
              253
              254
              255
              256
              257
              258
              259
              260
              261
              async def add_source(self, config: EventConfig) -> EventSource:
                  """Add and start a new event source.
              
                  Args:
                      config: Event source configuration
              
                  Raises:
                      ValueError: If source already exists or is invalid
                  """
                  logger.debug("Setting up event source", name=config.name, type=config.type)
                  from evented.base import EventSource
              
                  if config.name in self._sources:
                      msg = f"Event source already exists: {config.name}"
                      raise ValueError(msg)
              
                  try:
                      source = EventSource.from_config(config)
                      await source.__aenter__()
                      self._sources[config.name] = source
                      # Start processing events
                      name = f"event_processor_{config.name}"
                      self.node.task_manager.create_task(self._process_events(source), name=name)
                      logger.debug("Added event source", name=config.name)
                  except Exception as e:
                      msg = "Failed to add event source"
                      logger.exception(msg, name=config.name)
                      raise RuntimeError(msg) from e
                  else:
                      return source
              

              add_timed_event async

              add_timed_event(
                  schedule: str,
                  prompt: str,
                  *,
                  name: str | None = None,
                  timezone: str | None = None,
                  skip_missed: bool = False,
              ) -> TimeEventSource
              

              Add time-based event source.

              Parameters:

              Name Type Description Default
              schedule str

              Cron expression (e.g. "0 9 * * 1-5" for weekdays at 9am)

              required
              prompt str

              Prompt to send when triggered

              required
              name str | None

              Optional source name

              None
              timezone str | None

              Optional timezone (system default if None)

              None
              skip_missed bool

              Whether to skip missed executions

              False
              Source code in src/llmling_agent/messaging/event_manager.py
              160
              161
              162
              163
              164
              165
              166
              167
              168
              169
              170
              171
              172
              173
              174
              175
              176
              177
              178
              179
              180
              181
              182
              183
              184
              185
              async def add_timed_event(
                  self,
                  schedule: str,
                  prompt: str,
                  *,
                  name: str | None = None,
                  timezone: str | None = None,
                  skip_missed: bool = False,
              ) -> TimeEventSource:
                  """Add time-based event source.
              
                  Args:
                      schedule: Cron expression (e.g. "0 9 * * 1-5" for weekdays at 9am)
                      prompt: Prompt to send when triggered
                      name: Optional source name
                      timezone: Optional timezone (system default if None)
                      skip_missed: Whether to skip missed executions
                  """
                  config = TimeEventConfig(
                      name=name or f"timed_{len(self._sources)}",
                      schedule=schedule,
                      prompt=prompt,
                      timezone=timezone,
                      skip_missed=skip_missed,
                  )
                  return await self.add_source(config)  # type: ignore
              

              add_webhook async

              add_webhook(
                  path: str, *, name: str | None = None, port: int = 8000, secret: str | None = None
              ) -> EventSource
              

              Add webhook event source.

              Parameters:

              Name Type Description Default
              path str

              URL path to listen on

              required
              name str | None

              Optional source name

              None
              port int

              Port to listen on

              8000
              secret str | None

              Optional secret for request validation

              None
              Source code in src/llmling_agent/messaging/event_manager.py
              139
              140
              141
              142
              143
              144
              145
              146
              147
              148
              149
              150
              151
              152
              153
              154
              155
              156
              157
              158
              async def add_webhook(
                  self,
                  path: str,
                  *,
                  name: str | None = None,
                  port: int = 8000,
                  secret: str | None = None,
              ) -> EventSource:
                  """Add webhook event source.
              
                  Args:
                      path: URL path to listen on
                      name: Optional source name
                      port: Port to listen on
                      secret: Optional secret for request validation
                  """
                  name = name or f"webhook_{len(self._sources)}"
                  sec = SecretStr(secret) if secret else None
                  config = WebhookConfig(name=name, path=path, port=port, secret=sec)
                  return await self.add_source(config)
              

              cleanup async

              cleanup()
              

              Clean up all event sources and tasks.

              Source code in src/llmling_agent/messaging/event_manager.py
              294
              295
              296
              297
              298
              299
              async def cleanup(self):
                  """Clean up all event sources and tasks."""
                  self.enabled = False
              
                  for name in list(self._sources):
                      await self.remove_source(name)
              

              emit_event async

              emit_event(event: EventData)
              

              Emit event to all callbacks and optionally handle via node.

              Source code in src/llmling_agent/messaging/event_manager.py
               84
               85
               86
               87
               88
               89
               90
               91
               92
               93
               94
               95
               96
               97
               98
               99
              100
              101
              102
              103
              104
              105
              106
              async def emit_event(self, event: EventData):
                  """Emit event to all callbacks and optionally handle via node."""
                  if not self.enabled:
                      return
              
                  # Run custom callbacks
                  for callback in self._callbacks:
                      try:
                          result = callback(event)
                          if isinstance(result, Awaitable):
                              await result
                      except Exception:
                          logger.exception("Error in event callback", name=callback.__name__)
              
                  # Run default handler if enabled
                  if self.auto_run:
                      try:
                          prompt = event.to_prompt()
                          if prompt:
                              await self.node.run(prompt)
                      except Exception:
                          logger.exception("Error in default event handler")
                  self.event_processed.emit(event)
              

              poll

              poll(
                  event_type: str, interval: timedelta | None = None
              ) -> Callable[[Callable[..., T]], Callable[..., T]]
              
              poll(
                  event_type: str, interval: timedelta | None = None
              ) -> Callable[
                  [Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]
              ]
              
              poll(
                  event_type: str, interval: timedelta | None = None
              ) -> Callable[[Callable[..., Any]], Callable[..., Any]]
              

              Decorator to register an event observer.

              Parameters:

              Name Type Description Default
              event_type str

              Type of event to observe

              required
              interval timedelta | None

              Optional polling interval for periodic checks

              None
              Example

              @event_manager.observe("file_changed") async def handle_file_change(event: FileEventData): await process_file(event.path)

              Source code in src/llmling_agent/messaging/event_manager.py
              423
              424
              425
              426
              427
              428
              429
              430
              431
              432
              433
              434
              435
              436
              437
              438
              439
              440
              441
              442
              443
              444
              445
              446
              447
              448
              449
              450
              451
              452
              453
              454
              455
              456
              457
              458
              459
              def poll(
                  self,
                  event_type: str,
                  interval: timedelta | None = None,
              ) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
                  """Decorator to register an event observer.
              
                  Args:
                      event_type: Type of event to observe
                      interval: Optional polling interval for periodic checks
              
                  Example:
                      @event_manager.observe("file_changed")
                      async def handle_file_change(event: FileEventData):
                          await process_file(event.path)
                  """
              
                  def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
                      observer = EventObserver(func, interval=interval)
                      self._observers[event_type].append(observer)
              
                      @wraps(func)
                      async def wrapper(*args: Any, **kwargs: Any) -> Any:
                          result = await execute(func, *args, **kwargs)
                          # Convert result to event and emit
                          if self.enabled:
                              typ = type(result).__name__
                              meta = {"type": "function_result", "output_type": typ}
                              event = FunctionResultEventData(
                                  result=result, source=event_type, metadata=meta
                              )
                              await self.emit_event(event)
                          return result
              
                      return wrapper
              
                  return decorator
              

              remove_callback

              remove_callback(callback: EventCallback)
              

              Remove a previously registered callback.

              Source code in src/llmling_agent/messaging/event_manager.py
              80
              81
              82
              def remove_callback(self, callback: EventCallback):
                  """Remove a previously registered callback."""
                  self._callbacks.remove(callback)
              

              remove_source async

              remove_source(name: str)
              

              Stop and remove an event source.

              Parameters:

              Name Type Description Default
              name str

              Name of source to remove

              required
              Source code in src/llmling_agent/messaging/event_manager.py
              263
              264
              265
              266
              267
              268
              269
              270
              271
              async def remove_source(self, name: str):
                  """Stop and remove an event source.
              
                  Args:
                      name: Name of source to remove
                  """
                  if source := self._sources.pop(name, None):
                      await source.__aexit__(None, None, None)
                      logger.debug("Removed event source", name=name)
              

              track

              track(
                  event_name: str | None = None, **event_metadata: Any
              ) -> Callable[[Callable[..., T]], Callable[..., T]]
              
              track(
                  event_name: str | None = None, **event_metadata: Any
              ) -> Callable[
                  [Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]
              ]
              
              track(
                  event_name: str | None = None, **event_metadata: Any
              ) -> Callable[[Callable[..., Any]], Callable[..., Any]]
              

              Track function calls as events.

              Parameters:

              Name Type Description Default
              event_name str | None

              Optional name for the event (defaults to function name)

              None
              **event_metadata Any

              Additional metadata to include with event

              {}
              Example

              @event_manager.track("user_search") async def search_docs(query: str) -> list[Doc]: results = await search(query) return results # This result becomes event data

              Source code in src/llmling_agent/messaging/event_manager.py
              342
              343
              344
              345
              346
              347
              348
              349
              350
              351
              352
              353
              354
              355
              356
              357
              358
              359
              360
              361
              362
              363
              364
              365
              366
              367
              368
              369
              370
              371
              372
              373
              374
              375
              376
              377
              378
              379
              380
              381
              382
              383
              384
              385
              386
              387
              388
              389
              390
              391
              392
              393
              394
              395
              396
              397
              398
              399
              400
              401
              402
              403
              404
              405
              def track(
                  self,
                  event_name: str | None = None,
                  **event_metadata: Any,
              ) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
                  """Track function calls as events.
              
                  Args:
                      event_name: Optional name for the event (defaults to function name)
                      **event_metadata: Additional metadata to include with event
              
                  Example:
                      @event_manager.track("user_search")
                      async def search_docs(query: str) -> list[Doc]:
                          results = await search(query)
                          return results  # This result becomes event data
                  """
              
                  def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
                      name = event_name or func.__name__
              
                      @wraps(func)
                      async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                          start_time = get_now()
                          meta = {"args": args, "kwargs": kwargs, **event_metadata}
                          try:
                              result = await func(*args, **kwargs)
                              if self.enabled:
                                  meta |= {"status": "success", "duration": get_now() - start_time}
                                  event = EventData.create(name, content=result, metadata=meta)
                                  await self.emit_event(event)
                          except Exception as e:
                              if self.enabled:
                                  dur = get_now() - start_time
                                  meta |= {"status": "error", "error": str(e), "duration": dur}
                                  event = EventData.create(name, content=str(e), metadata=meta)
                                  await self.emit_event(event)
                              raise
                          else:
                              return result
              
                      @wraps(func)
                      def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
                          start_time = get_now()
                          meta = {"args": args, "kwargs": kwargs, **event_metadata}
                          try:
                              result = func(*args, **kwargs)
                              if self.enabled:
                                  meta |= {"status": "success", "duration": get_now() - start_time}
                                  event = EventData.create(name, content=result, metadata=meta)
                                  self.node.task_manager.run_background(self.emit_event(event))
                          except Exception as e:
                              if self.enabled:
                                  dur = get_now() - start_time
                                  meta |= {"status": "error", "error": str(e), "duration": dur}
                                  event = EventData.create(name, content=str(e), metadata=meta)
                                  self.node.task_manager.run_background(self.emit_event(event))
                              raise
                          else:
                              return result
              
                      return async_wrapper if inspect.iscoroutinefunction(func) else sync_wrapper
              
                  return decorator
              

              MessageEmitter

              Bases: ABC

              Base class for all message processing nodes.

              Source code in src/llmling_agent/messaging/messageemitter.py
               44
               45
               46
               47
               48
               49
               50
               51
               52
               53
               54
               55
               56
               57
               58
               59
               60
               61
               62
               63
               64
               65
               66
               67
               68
               69
               70
               71
               72
               73
               74
               75
               76
               77
               78
               79
               80
               81
               82
               83
               84
               85
               86
               87
               88
               89
               90
               91
               92
               93
               94
               95
               96
               97
               98
               99
              100
              101
              102
              103
              104
              105
              106
              107
              108
              109
              110
              111
              112
              113
              114
              115
              116
              117
              118
              119
              120
              121
              122
              123
              124
              125
              126
              127
              128
              129
              130
              131
              132
              133
              134
              135
              136
              137
              138
              139
              140
              141
              142
              143
              144
              145
              146
              147
              148
              149
              150
              151
              152
              153
              154
              155
              156
              157
              158
              159
              160
              161
              162
              163
              164
              165
              166
              167
              168
              169
              170
              171
              172
              173
              174
              175
              176
              177
              178
              179
              180
              181
              182
              183
              184
              185
              186
              187
              188
              189
              190
              191
              192
              193
              194
              195
              196
              197
              198
              199
              200
              201
              202
              203
              204
              205
              206
              207
              208
              209
              210
              211
              212
              213
              214
              215
              216
              217
              218
              219
              220
              221
              222
              223
              224
              225
              226
              227
              228
              229
              230
              231
              232
              233
              234
              235
              236
              237
              238
              239
              240
              241
              242
              243
              244
              245
              246
              247
              248
              249
              250
              251
              252
              253
              254
              255
              256
              257
              258
              259
              260
              261
              262
              263
              264
              265
              266
              267
              268
              269
              270
              271
              272
              273
              274
              275
              276
              277
              278
              279
              280
              281
              282
              283
              284
              285
              286
              287
              288
              289
              290
              291
              292
              293
              294
              295
              296
              297
              298
              299
              300
              301
              302
              303
              304
              305
              306
              307
              308
              309
              310
              311
              312
              313
              314
              315
              316
              317
              318
              319
              320
              321
              322
              323
              324
              325
              326
              327
              328
              329
              330
              331
              332
              333
              334
              335
              336
              337
              338
              339
              340
              341
              342
              343
              344
              345
              346
              347
              348
              349
              350
              351
              352
              353
              354
              355
              356
              357
              358
              359
              360
              361
              362
              363
              364
              365
              366
              367
              368
              369
              370
              371
              372
              373
              374
              375
              376
              377
              378
              379
              380
              381
              382
              383
              384
              385
              386
              387
              388
              389
              390
              391
              392
              393
              394
              395
              396
              397
              398
              399
              400
              401
              402
              403
              404
              405
              406
              class MessageEmitter[TDeps, TResult](ABC):
                  """Base class for all message processing nodes."""
              
                  message_received = Signal(ChatMessage)
                  """Signal emitted when node receives a message."""
              
                  message_sent = Signal(ChatMessage)
                  """Signal emitted when node creates a message."""
              
                  def __init__(
                      self,
                      name: str | None = None,
                      description: str | None = None,
                      context: NodeContext | None = None,
                      mcp_servers: Sequence[str | MCPServerConfig] | None = None,
                      enable_logging: bool = True,
                      progress_handler: ContextualProgressHandler | None = None,
                  ):
                      """Initialize message node."""
                      super().__init__()
                      from llmling_agent.mcp_server.manager import MCPManager
                      from llmling_agent.messaging import EventManager
                      from llmling_agent.messaging.connection_manager import ConnectionManager
              
                      self.task_manager = TaskManager()
                      self._name = name or self.__class__.__name__
                      self.log = logger.bind(agent_name=self._name)
              
                      self.description = description
                      self.connections = ConnectionManager(self)
                      self._events = EventManager(self, enable_events=True)
                      self.mcp = MCPManager(
                          f"node_{self._name}",
                          servers=mcp_servers or [],
                          context=context,
                          owner=self.name,
                          progress_handler=progress_handler,
                      )
                      self.enable_db_logging = enable_logging
                      self.conversation_id = str(uuid4())
                      # Connect to the combined signal to capture all messages
                      # TODO: need to check this
                      # node.message_received.connect(self.log_message)
              
                  async def __aenter__(self) -> Self:
                      """Initialize base message node."""
                      if self.enable_db_logging:
                          await self.context.storage.log_conversation(
                              conversation_id=self.conversation_id,
                              node_name=self.name,
                          )
                      try:
                          await self._events.__aenter__()
                          await self.mcp.__aenter__()
                      except Exception as e:
                          await self.__aexit__(type(e), e, e.__traceback__)
                          msg = f"Failed to initialize {self.name}"
                          raise RuntimeError(msg) from e
                      else:
                          return self
              
                  async def __aexit__(
                      self,
                      exc_type: type[BaseException] | None,
                      exc_val: BaseException | None,
                      exc_tb: TracebackType | None,
                  ):
                      """Clean up base resources."""
                      await self._events.__aexit__(exc_type, exc_val, exc_tb)
                      await self.mcp.__aexit__(exc_type, exc_val, exc_tb)
                      await self.task_manager.cleanup_tasks()
              
                  @property
                  def connection_stats(self) -> AggregatedTalkStats:
                      """Get stats for all active connections of this node."""
                      stats = [talk.stats for talk in self.connections.get_connections()]
                      return AggregatedTalkStats(stats=stats)
              
                  @property
                  def context(self) -> NodeContext:
                      """Get node context."""
                      raise NotImplementedError
              
                  @property
                  def name(self) -> str:
                      """Get agent name."""
                      return self._name or "llmling-agent"
              
                  @name.setter
                  def name(self, value: str):
                      self._name = value
              
                  @overload
                  def __rshift__(
                      self, other: MessageNode[Any, Any] | ProcessorCallback[Any]
                  ) -> Talk[TResult]: ...
              
                  @overload
                  def __rshift__(
                      self, other: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]]
                  ) -> TeamTalk[TResult]: ...
              
                  def __rshift__(
                      self,
                      other: MessageNode[Any, Any]
                      | ProcessorCallback[Any]
                      | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
                  ) -> Talk[Any] | TeamTalk[Any]:
                      """Connect agent to another agent or group.
              
                      Example:
                          agent >> other_agent  # Connect to single agent
                          agent >> (agent2 & agent3)  # Connect to group
                          agent >> "other_agent"  # Connect by name (needs pool)
                      """
                      return self.connect_to(other)
              
                  @overload
                  def connect_to(
                      self,
                      target: MessageNode[Any, Any] | ProcessorCallback[Any],
                      *,
                      queued: Literal[True],
                      queue_strategy: Literal["concat"],
                  ) -> Talk[str]: ...
              
                  @overload
                  def connect_to(
                      self,
                      target: MessageNode[Any, Any] | ProcessorCallback[Any],
                      *,
                      connection_type: ConnectionType = "run",
                      name: str | None = None,
                      priority: int = 0,
                      delay: timedelta | None = None,
                      queued: bool = False,
                      queue_strategy: QueueStrategy = "latest",
                      transform: AnyTransformFn | None = None,
                      filter_condition: AsyncFilterFn | None = None,
                      stop_condition: AsyncFilterFn | None = None,
                      exit_condition: AsyncFilterFn | None = None,
                  ) -> Talk[TResult]: ...
              
                  @overload
                  def connect_to(
                      self,
                      target: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
                      *,
                      queued: Literal[True],
                      queue_strategy: Literal["concat"],
                  ) -> TeamTalk[str]: ...
              
                  @overload
                  def connect_to(
                      self,
                      target: Sequence[MessageNode[Any, TResult] | ProcessorCallback[TResult]],
                      *,
                      connection_type: ConnectionType = "run",
                      name: str | None = None,
                      priority: int = 0,
                      delay: timedelta | None = None,
                      queued: bool = False,
                      queue_strategy: QueueStrategy = "latest",
                      transform: AnyTransformFn | None = None,
                      filter_condition: AsyncFilterFn | None = None,
                      stop_condition: AsyncFilterFn | None = None,
                      exit_condition: AsyncFilterFn | None = None,
                  ) -> TeamTalk[TResult]: ...
              
                  @overload
                  def connect_to(
                      self,
                      target: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
                      *,
                      connection_type: ConnectionType = "run",
                      name: str | None = None,
                      priority: int = 0,
                      delay: timedelta | None = None,
                      queued: bool = False,
                      queue_strategy: QueueStrategy = "latest",
                      transform: AnyTransformFn | None = None,
                      filter_condition: AsyncFilterFn | None = None,
                      stop_condition: AsyncFilterFn | None = None,
                      exit_condition: AsyncFilterFn | None = None,
                  ) -> TeamTalk: ...
              
                  def connect_to(
                      self,
                      target: MessageNode[Any, Any]
                      | ProcessorCallback[Any]
                      | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
                      *,
                      connection_type: ConnectionType = "run",
                      name: str | None = None,
                      priority: int = 0,
                      delay: timedelta | None = None,
                      queued: bool = False,
                      queue_strategy: QueueStrategy = "latest",
                      transform: AnyTransformFn | None = None,
                      filter_condition: AsyncFilterFn | None = None,
                      stop_condition: AsyncFilterFn | None = None,
                      exit_condition: AsyncFilterFn | None = None,
                  ) -> Talk[Any] | TeamTalk:
                      """Create connection(s) to target(s)."""
                      # Handle callable case
                      from llmling_agent import MessageNode
                      from llmling_agent.agent import Agent
                      from llmling_agent.delegation.base_team import BaseTeam
              
                      if callable(target):
                          target = Agent.from_callback(target)
                          if pool := self.context.pool:
                              pool.register(target.name, target)
                      # we are explicit here just to make disctinction clear, we only want sequences
                      # of message units
                      if isinstance(target, Sequence) and not isinstance(target, BaseTeam):
                          targets: list[MessageNode] = []
                          for t in target:
                              match t:
                                  case _ if callable(t):
                                      other: MessageNode = Agent.from_callback(t)
                                      if pool := self.context.pool:
                                          pool.register(other.name, other)
                                      targets.append(other)
                                  case MessageNode():
                                      targets.append(t)
                                  case _:
                                      msg = f"Invalid node type: {type(t)}"
                                      raise TypeError(msg)
                      else:
                          targets = target  # type: ignore
                      return self.connections.create_connection(
                          self,
                          targets,
                          connection_type=connection_type,
                          priority=priority,
                          name=name,
                          delay=delay,
                          queued=queued,
                          queue_strategy=queue_strategy,
                          transform=transform,
                          filter_condition=filter_condition,
                          stop_condition=stop_condition,
                          exit_condition=exit_condition,
                      )
              
                  async def disconnect_all(self):
                      """Disconnect from all nodes."""
                      for target in list(self.connections.get_targets()):
                          self.stop_passing_results_to(target)
              
                  def stop_passing_results_to(self, other: MessageNode):
                      """Stop forwarding results to another node."""
                      self.connections.disconnect(other)
              
                  async def pre_run(
                      self,
                      *prompt: PromptCompatible | ChatMessage,
                  ) -> tuple[ChatMessage[Any], list[BaseContent | str]]:
                      """Hook to prepare a MessgeNode run call.
              
                      Args:
                          *prompt: The prompt(s) to prepare.
              
                      Returns:
                          A tuple of:
                              - Either incoming message, or a constructed incoming message based
                                on the prompt(s).
                              - A list of prompts to be sent to the model.
                      """
                      if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                          user_msg = prompt[0]
                          prompts = await convert_prompts([user_msg.content])
                          # Update received message's chain to show it came through its source
                          user_msg = user_msg.forwarded(prompt[0])
                          # change role since "perspective" changes, clear cost to avoid counting twice
                          user_msg = replace(user_msg, role="user", cost_info=None)
                          final_prompt = "\n\n".join(str(p) for p in prompts)
                      else:
                          prompts = await convert_prompts(prompt)
                          final_prompt = "\n\n".join(str(p) for p in prompts)
                          id_ = str(uuid4())
                          user_msg = ChatMessage(content=final_prompt, role="user", conversation_id=id_)
                      self.message_received.emit(user_msg)
                      self.context.current_prompt = final_prompt
                      return user_msg, prompts
              
                  # async def post_run(
                  #     self,
                  #     message: ChatMessage[TResult],
                  #     previous_message: ChatMessage[Any] | None,
                  #     wait_for_connections: bool | None = None,
                  # ) -> ChatMessage[Any]:
                  #     # For chain processing, update the response's chain
                  #     if previous_message:
                  #         message = message.forwarded(previous_message)
                  #         conversation_id = previous_message.conversation_id
                  #     else:
                  #         conversation_id = str(uuid4())
                  #     # Set conversation_id on response message
                  #     message = replace(message, conversation_id=conversation_id)
                  #     self.message_sent.emit(message)
                  #     await self.connections.route_message(message, wait=wait_for_connections)
                  #     return message
              
                  async def run(
                      self,
                      *prompt: PromptCompatible | ChatMessage,
                      wait_for_connections: bool | None = None,
                      store_history: bool = True,
                      **kwargs: Any,
                  ) -> ChatMessage[TResult]:
                      """Execute node with prompts and handle message routing.
              
                      Args:
                          prompt: Input prompts
                          wait_for_connections: Whether to wait for forwarded messages
                          store_history: Whether to store in conversation history
                          **kwargs: Additional arguments for _run
                      """
                      from llmling_agent import Agent
              
                      user_msg, prompts = await self.pre_run(*prompt)
                      message = await self._run(
                          *prompts,
                          store_history=store_history,
                          conversation_id=user_msg.conversation_id,
                          **kwargs,
                      )
              
                      # For chain processing, update the response's chain
                      if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                          message = message.forwarded(prompt[0])
              
                      if store_history and isinstance(self, Agent):
                          self.conversation.add_chat_messages([user_msg, message])
                      self.message_sent.emit(message)
                      await self.log_message(message)
                      await self.connections.route_message(message, wait=wait_for_connections)
                      return message
              
                  @abstractmethod
                  def _run(
                      self,
                      *prompts: Any,
                      **kwargs: Any,
                  ) -> Coroutine[None, None, ChatMessage[TResult]]:
                      """Implementation-specific run logic."""
              
                  async def get_message_history(self, limit: int | None = None) -> list[ChatMessage]:
                      """Get message history from storage."""
                      if not self.enable_db_logging:
                          return []  # No history if not logging
              
                      from llmling_agent_config.session import SessionQuery
              
                      query = SessionQuery(name=self.conversation_id, limit=limit)
                      return await self.context.storage.filter_messages(query)
              
                  async def log_message(self, message: ChatMessage):
                      """Handle message from chat signal."""
                      if self.enable_db_logging:
                          await self.context.storage.log_message(message)  # pyright: ignore
              

              connection_stats property

              connection_stats: AggregatedTalkStats
              

              Get stats for all active connections of this node.

              context property

              context: NodeContext
              

              Get node context.

              message_received class-attribute instance-attribute

              message_received = Signal(ChatMessage)
              

              Signal emitted when node receives a message.

              message_sent class-attribute instance-attribute

              message_sent = Signal(ChatMessage)
              

              Signal emitted when node creates a message.

              name property writable

              name: str
              

              Get agent name.

              __aenter__ async

              __aenter__() -> Self
              

              Initialize base message node.

              Source code in src/llmling_agent/messaging/messageemitter.py
               88
               89
               90
               91
               92
               93
               94
               95
               96
               97
               98
               99
              100
              101
              102
              103
              async def __aenter__(self) -> Self:
                  """Initialize base message node."""
                  if self.enable_db_logging:
                      await self.context.storage.log_conversation(
                          conversation_id=self.conversation_id,
                          node_name=self.name,
                      )
                  try:
                      await self._events.__aenter__()
                      await self.mcp.__aenter__()
                  except Exception as e:
                      await self.__aexit__(type(e), e, e.__traceback__)
                      msg = f"Failed to initialize {self.name}"
                      raise RuntimeError(msg) from e
                  else:
                      return self
              

              __aexit__ async

              __aexit__(
                  exc_type: type[BaseException] | None,
                  exc_val: BaseException | None,
                  exc_tb: TracebackType | None,
              )
              

              Clean up base resources.

              Source code in src/llmling_agent/messaging/messageemitter.py
              105
              106
              107
              108
              109
              110
              111
              112
              113
              114
              async def __aexit__(
                  self,
                  exc_type: type[BaseException] | None,
                  exc_val: BaseException | None,
                  exc_tb: TracebackType | None,
              ):
                  """Clean up base resources."""
                  await self._events.__aexit__(exc_type, exc_val, exc_tb)
                  await self.mcp.__aexit__(exc_type, exc_val, exc_tb)
                  await self.task_manager.cleanup_tasks()
              

              __init__

              __init__(
                  name: str | None = None,
                  description: str | None = None,
                  context: NodeContext | None = None,
                  mcp_servers: Sequence[str | MCPServerConfig] | None = None,
                  enable_logging: bool = True,
                  progress_handler: ContextualProgressHandler | None = None,
              )
              

              Initialize message node.

              Source code in src/llmling_agent/messaging/messageemitter.py
              53
              54
              55
              56
              57
              58
              59
              60
              61
              62
              63
              64
              65
              66
              67
              68
              69
              70
              71
              72
              73
              74
              75
              76
              77
              78
              79
              80
              81
              82
              83
              def __init__(
                  self,
                  name: str | None = None,
                  description: str | None = None,
                  context: NodeContext | None = None,
                  mcp_servers: Sequence[str | MCPServerConfig] | None = None,
                  enable_logging: bool = True,
                  progress_handler: ContextualProgressHandler | None = None,
              ):
                  """Initialize message node."""
                  super().__init__()
                  from llmling_agent.mcp_server.manager import MCPManager
                  from llmling_agent.messaging import EventManager
                  from llmling_agent.messaging.connection_manager import ConnectionManager
              
                  self.task_manager = TaskManager()
                  self._name = name or self.__class__.__name__
                  self.log = logger.bind(agent_name=self._name)
              
                  self.description = description
                  self.connections = ConnectionManager(self)
                  self._events = EventManager(self, enable_events=True)
                  self.mcp = MCPManager(
                      f"node_{self._name}",
                      servers=mcp_servers or [],
                      context=context,
                      owner=self.name,
                      progress_handler=progress_handler,
                  )
                  self.enable_db_logging = enable_logging
                  self.conversation_id = str(uuid4())
              

              __rshift__

              __rshift__(other: MessageNode[Any, Any] | ProcessorCallback[Any]) -> Talk[TResult]
              
              __rshift__(
                  other: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
              ) -> TeamTalk[TResult]
              
              __rshift__(
                  other: MessageNode[Any, Any]
                  | ProcessorCallback[Any]
                  | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
              ) -> Talk[Any] | TeamTalk[Any]
              

              Connect agent to another agent or group.

              Example

              agent >> other_agent # Connect to single agent agent >> (agent2 & agent3) # Connect to group agent >> "other_agent" # Connect by name (needs pool)

              Source code in src/llmling_agent/messaging/messageemitter.py
              146
              147
              148
              149
              150
              151
              152
              153
              154
              155
              156
              157
              158
              159
              def __rshift__(
                  self,
                  other: MessageNode[Any, Any]
                  | ProcessorCallback[Any]
                  | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
              ) -> Talk[Any] | TeamTalk[Any]:
                  """Connect agent to another agent or group.
              
                  Example:
                      agent >> other_agent  # Connect to single agent
                      agent >> (agent2 & agent3)  # Connect to group
                      agent >> "other_agent"  # Connect by name (needs pool)
                  """
                  return self.connect_to(other)
              

              connect_to

              connect_to(
                  target: MessageNode[Any, Any] | ProcessorCallback[Any],
                  *,
                  queued: Literal[True],
                  queue_strategy: Literal["concat"],
              ) -> Talk[str]
              
              connect_to(
                  target: MessageNode[Any, Any] | ProcessorCallback[Any],
                  *,
                  connection_type: ConnectionType = "run",
                  name: str | None = None,
                  priority: int = 0,
                  delay: timedelta | None = None,
                  queued: bool = False,
                  queue_strategy: QueueStrategy = "latest",
                  transform: AnyTransformFn | None = None,
                  filter_condition: AsyncFilterFn | None = None,
                  stop_condition: AsyncFilterFn | None = None,
                  exit_condition: AsyncFilterFn | None = None,
              ) -> Talk[TResult]
              
              connect_to(
                  target: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
                  *,
                  queued: Literal[True],
                  queue_strategy: Literal["concat"],
              ) -> TeamTalk[str]
              
              connect_to(
                  target: Sequence[MessageNode[Any, TResult] | ProcessorCallback[TResult]],
                  *,
                  connection_type: ConnectionType = "run",
                  name: str | None = None,
                  priority: int = 0,
                  delay: timedelta | None = None,
                  queued: bool = False,
                  queue_strategy: QueueStrategy = "latest",
                  transform: AnyTransformFn | None = None,
                  filter_condition: AsyncFilterFn | None = None,
                  stop_condition: AsyncFilterFn | None = None,
                  exit_condition: AsyncFilterFn | None = None,
              ) -> TeamTalk[TResult]
              
              connect_to(
                  target: Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
                  *,
                  connection_type: ConnectionType = "run",
                  name: str | None = None,
                  priority: int = 0,
                  delay: timedelta | None = None,
                  queued: bool = False,
                  queue_strategy: QueueStrategy = "latest",
                  transform: AnyTransformFn | None = None,
                  filter_condition: AsyncFilterFn | None = None,
                  stop_condition: AsyncFilterFn | None = None,
                  exit_condition: AsyncFilterFn | None = None,
              ) -> TeamTalk
              
              connect_to(
                  target: MessageNode[Any, Any]
                  | ProcessorCallback[Any]
                  | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
                  *,
                  connection_type: ConnectionType = "run",
                  name: str | None = None,
                  priority: int = 0,
                  delay: timedelta | None = None,
                  queued: bool = False,
                  queue_strategy: QueueStrategy = "latest",
                  transform: AnyTransformFn | None = None,
                  filter_condition: AsyncFilterFn | None = None,
                  stop_condition: AsyncFilterFn | None = None,
                  exit_condition: AsyncFilterFn | None = None,
              ) -> Talk[Any] | TeamTalk
              

              Create connection(s) to target(s).

              Source code in src/llmling_agent/messaging/messageemitter.py
              230
              231
              232
              233
              234
              235
              236
              237
              238
              239
              240
              241
              242
              243
              244
              245
              246
              247
              248
              249
              250
              251
              252
              253
              254
              255
              256
              257
              258
              259
              260
              261
              262
              263
              264
              265
              266
              267
              268
              269
              270
              271
              272
              273
              274
              275
              276
              277
              278
              279
              280
              281
              282
              283
              284
              285
              286
              287
              288
              def connect_to(
                  self,
                  target: MessageNode[Any, Any]
                  | ProcessorCallback[Any]
                  | Sequence[MessageNode[Any, Any] | ProcessorCallback[Any]],
                  *,
                  connection_type: ConnectionType = "run",
                  name: str | None = None,
                  priority: int = 0,
                  delay: timedelta | None = None,
                  queued: bool = False,
                  queue_strategy: QueueStrategy = "latest",
                  transform: AnyTransformFn | None = None,
                  filter_condition: AsyncFilterFn | None = None,
                  stop_condition: AsyncFilterFn | None = None,
                  exit_condition: AsyncFilterFn | None = None,
              ) -> Talk[Any] | TeamTalk:
                  """Create connection(s) to target(s)."""
                  # Handle callable case
                  from llmling_agent import MessageNode
                  from llmling_agent.agent import Agent
                  from llmling_agent.delegation.base_team import BaseTeam
              
                  if callable(target):
                      target = Agent.from_callback(target)
                      if pool := self.context.pool:
                          pool.register(target.name, target)
                  # we are explicit here just to make disctinction clear, we only want sequences
                  # of message units
                  if isinstance(target, Sequence) and not isinstance(target, BaseTeam):
                      targets: list[MessageNode] = []
                      for t in target:
                          match t:
                              case _ if callable(t):
                                  other: MessageNode = Agent.from_callback(t)
                                  if pool := self.context.pool:
                                      pool.register(other.name, other)
                                  targets.append(other)
                              case MessageNode():
                                  targets.append(t)
                              case _:
                                  msg = f"Invalid node type: {type(t)}"
                                  raise TypeError(msg)
                  else:
                      targets = target  # type: ignore
                  return self.connections.create_connection(
                      self,
                      targets,
                      connection_type=connection_type,
                      priority=priority,
                      name=name,
                      delay=delay,
                      queued=queued,
                      queue_strategy=queue_strategy,
                      transform=transform,
                      filter_condition=filter_condition,
                      stop_condition=stop_condition,
                      exit_condition=exit_condition,
                  )
              

              disconnect_all async

              disconnect_all()
              

              Disconnect from all nodes.

              Source code in src/llmling_agent/messaging/messageemitter.py
              290
              291
              292
              293
              async def disconnect_all(self):
                  """Disconnect from all nodes."""
                  for target in list(self.connections.get_targets()):
                      self.stop_passing_results_to(target)
              

              get_message_history async

              get_message_history(limit: int | None = None) -> list[ChatMessage]
              

              Get message history from storage.

              Source code in src/llmling_agent/messaging/messageemitter.py
              393
              394
              395
              396
              397
              398
              399
              400
              401
              async def get_message_history(self, limit: int | None = None) -> list[ChatMessage]:
                  """Get message history from storage."""
                  if not self.enable_db_logging:
                      return []  # No history if not logging
              
                  from llmling_agent_config.session import SessionQuery
              
                  query = SessionQuery(name=self.conversation_id, limit=limit)
                  return await self.context.storage.filter_messages(query)
              

              log_message async

              log_message(message: ChatMessage)
              

              Handle message from chat signal.

              Source code in src/llmling_agent/messaging/messageemitter.py
              403
              404
              405
              406
              async def log_message(self, message: ChatMessage):
                  """Handle message from chat signal."""
                  if self.enable_db_logging:
                      await self.context.storage.log_message(message)  # pyright: ignore
              

              pre_run async

              pre_run(
                  *prompt: PromptCompatible | ChatMessage,
              ) -> tuple[ChatMessage[Any], list[BaseContent | str]]
              

              Hook to prepare a MessgeNode run call.

              Parameters:

              Name Type Description Default
              *prompt PromptCompatible | ChatMessage

              The prompt(s) to prepare.

              ()

              Returns:

              Type Description
              tuple[ChatMessage[Any], list[BaseContent | str]]

              A tuple of: - Either incoming message, or a constructed incoming message based on the prompt(s). - A list of prompts to be sent to the model.

              Source code in src/llmling_agent/messaging/messageemitter.py
              299
              300
              301
              302
              303
              304
              305
              306
              307
              308
              309
              310
              311
              312
              313
              314
              315
              316
              317
              318
              319
              320
              321
              322
              323
              324
              325
              326
              327
              328
              329
              async def pre_run(
                  self,
                  *prompt: PromptCompatible | ChatMessage,
              ) -> tuple[ChatMessage[Any], list[BaseContent | str]]:
                  """Hook to prepare a MessgeNode run call.
              
                  Args:
                      *prompt: The prompt(s) to prepare.
              
                  Returns:
                      A tuple of:
                          - Either incoming message, or a constructed incoming message based
                            on the prompt(s).
                          - A list of prompts to be sent to the model.
                  """
                  if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                      user_msg = prompt[0]
                      prompts = await convert_prompts([user_msg.content])
                      # Update received message's chain to show it came through its source
                      user_msg = user_msg.forwarded(prompt[0])
                      # change role since "perspective" changes, clear cost to avoid counting twice
                      user_msg = replace(user_msg, role="user", cost_info=None)
                      final_prompt = "\n\n".join(str(p) for p in prompts)
                  else:
                      prompts = await convert_prompts(prompt)
                      final_prompt = "\n\n".join(str(p) for p in prompts)
                      id_ = str(uuid4())
                      user_msg = ChatMessage(content=final_prompt, role="user", conversation_id=id_)
                  self.message_received.emit(user_msg)
                  self.context.current_prompt = final_prompt
                  return user_msg, prompts
              

              run async

              run(
                  *prompt: PromptCompatible | ChatMessage,
                  wait_for_connections: bool | None = None,
                  store_history: bool = True,
                  **kwargs: Any,
              ) -> ChatMessage[TResult]
              

              Execute node with prompts and handle message routing.

              Parameters:

              Name Type Description Default
              prompt PromptCompatible | ChatMessage

              Input prompts

              ()
              wait_for_connections bool | None

              Whether to wait for forwarded messages

              None
              store_history bool

              Whether to store in conversation history

              True
              **kwargs Any

              Additional arguments for _run

              {}
              Source code in src/llmling_agent/messaging/messageemitter.py
              349
              350
              351
              352
              353
              354
              355
              356
              357
              358
              359
              360
              361
              362
              363
              364
              365
              366
              367
              368
              369
              370
              371
              372
              373
              374
              375
              376
              377
              378
              379
              380
              381
              382
              383
              async def run(
                  self,
                  *prompt: PromptCompatible | ChatMessage,
                  wait_for_connections: bool | None = None,
                  store_history: bool = True,
                  **kwargs: Any,
              ) -> ChatMessage[TResult]:
                  """Execute node with prompts and handle message routing.
              
                  Args:
                      prompt: Input prompts
                      wait_for_connections: Whether to wait for forwarded messages
                      store_history: Whether to store in conversation history
                      **kwargs: Additional arguments for _run
                  """
                  from llmling_agent import Agent
              
                  user_msg, prompts = await self.pre_run(*prompt)
                  message = await self._run(
                      *prompts,
                      store_history=store_history,
                      conversation_id=user_msg.conversation_id,
                      **kwargs,
                  )
              
                  # For chain processing, update the response's chain
                  if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                      message = message.forwarded(prompt[0])
              
                  if store_history and isinstance(self, Agent):
                      self.conversation.add_chat_messages([user_msg, message])
                  self.message_sent.emit(message)
                  await self.log_message(message)
                  await self.connections.route_message(message, wait=wait_for_connections)
                  return message
              

              stop_passing_results_to

              stop_passing_results_to(other: MessageNode)
              

              Stop forwarding results to another node.

              Source code in src/llmling_agent/messaging/messageemitter.py
              295
              296
              297
              def stop_passing_results_to(self, other: MessageNode):
                  """Stop forwarding results to another node."""
                  self.connections.disconnect(other)
              

              MessageNode

              Bases: MessageEmitter[TDeps, TResult]

              Base class for all message processing nodes.

              Source code in src/llmling_agent/messaging/messagenode.py
               22
               23
               24
               25
               26
               27
               28
               29
               30
               31
               32
               33
               34
               35
               36
               37
               38
               39
               40
               41
               42
               43
               44
               45
               46
               47
               48
               49
               50
               51
               52
               53
               54
               55
               56
               57
               58
               59
               60
               61
               62
               63
               64
               65
               66
               67
               68
               69
               70
               71
               72
               73
               74
               75
               76
               77
               78
               79
               80
               81
               82
               83
               84
               85
               86
               87
               88
               89
               90
               91
               92
               93
               94
               95
               96
               97
               98
               99
              100
              101
              102
              103
              104
              105
              106
              107
              108
              109
              110
              111
              112
              113
              114
              115
              116
              117
              118
              119
              120
              121
              122
              123
              124
              125
              126
              127
              128
              129
              130
              131
              132
              133
              134
              135
              136
              137
              138
              139
              140
              141
              142
              143
              144
              145
              class MessageNode[TDeps, TResult](MessageEmitter[TDeps, TResult]):
                  """Base class for all message processing nodes."""
              
                  async def pre_run(
                      self,
                      *prompt: PromptCompatible | ChatMessage,
                  ) -> tuple[ChatMessage[Any], list[BaseContent | str]]:
                      """Hook to prepare a MessgeNode run call.
              
                      Args:
                          *prompt: The prompt(s) to prepare.
              
                      Returns:
                          A tuple of:
                              - Either incoming message, or a constructed incoming message based
                                on the prompt(s).
                              - A list of prompts to be sent to the model.
                      """
                      if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                          user_msg = prompt[0]
                          prompts = await convert_prompts([user_msg.content])
                          # Update received message's chain to show it came through its source
                          user_msg = user_msg.forwarded(prompt[0]).to_request()
                          # clear cost info to avoid double-counting
                          final_prompt = "\n\n".join(str(p) for p in prompts)
                      else:
                          prompts = await convert_prompts(prompt)
                          final_prompt = "\n\n".join(str(p) for p in prompts)
                          # use format_prompts?
                          messages = [i if isinstance(i, str) else i.to_pydantic_ai() for i in prompts]
                          user_msg = ChatMessage.user_prompt(message=messages)
                      self.message_received.emit(user_msg)
                      self.context.current_prompt = final_prompt
                      return user_msg, prompts
              
                  # async def post_run(
                  #     self,
                  #     message: ChatMessage[TResult],
                  #     previous_message: ChatMessage[Any] | None,
                  #     wait_for_connections: bool | None = None,
                  # ) -> ChatMessage[Any]:
                  #     # For chain processing, update the response's chain
                  #     if previous_message:
                  #         message = message.forwarded(previous_message)
                  #         conversation_id = previous_message.conversation_id
                  #     else:
                  #         conversation_id = str(uuid4())
                  #     # Set conversation_id on response message
                  #     message = replace(message, conversation_id=conversation_id)
                  #     self.message_sent.emit(message)
                  #     await self.log_message(response_msg)
                  #     await self.connections.route_message(message, wait=wait_for_connections)
                  #     return message
              
                  # @overload
                  # async def run(
                  #     self,
                  #     *prompt: PromptCompatible | ChatMessage,
                  #     wait_for_connections: bool | None = None,
                  #     store_history: bool = True,
                  #     output_type: None,
                  #     **kwargs: Any,
                  # ) -> ChatMessage[TResult]: ...
              
                  # @overload
                  # async def run[OutputTypeT](
                  #     self,
                  #     *prompt: PromptCompatible | ChatMessage,
                  #     wait_for_connections: bool | None = None,
                  #     store_history: bool = True,
                  #     output_type: type[OutputTypeT],
                  #     **kwargs: Any,
                  # ) -> ChatMessage[OutputTypeT]: ...
              
                  @method_spawner
                  async def run[OutputTypeT](
                      self,
                      *prompt: PromptCompatible | ChatMessage,
                      wait_for_connections: bool | None = None,
                      store_history: bool = True,
                      output_type: type[OutputTypeT] | None = None,
                      **kwargs: Any,
                  ) -> ChatMessage[Any]:
                      """Execute node with prompts and handle message routing.
              
                      Args:
                          prompt: Input prompts
                          wait_for_connections: Whether to wait for forwarded messages
                          store_history: Whether to store in conversation history
                          output_type: Type of output to expect
                          **kwargs: Additional arguments for _run
                      """
                      from llmling_agent import Agent
              
                      user_msg, prompts = await self.pre_run(*prompt)
                      message = await self._run(
                          *prompts,
                          store_history=store_history,
                          conversation_id=user_msg.conversation_id,
                          output_type=output_type,
                          **kwargs,
                      )
              
                      # For chain processing, update the response's chain
                      if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                          message = message.forwarded(prompt[0])
              
                      if store_history and isinstance(self, Agent):
                          self.conversation.add_chat_messages([user_msg, message])
                      self.message_sent.emit(message)
                      await self.connections.route_message(message, wait=wait_for_connections)
                      return message
              
                  @abstractmethod
                  async def get_stats(self) -> MessageStats | AggregatedMessageStats:
                      """Get message statistics for this node."""
              
                  @abstractmethod
                  def run_iter(
                      self,
                      *prompts: Any,
                      **kwargs: Any,
                  ) -> AsyncIterator[ChatMessage[Any]]:
                      """Yield messages during execution."""
              

              get_stats abstractmethod async

              Get message statistics for this node.

              Source code in src/llmling_agent/messaging/messagenode.py
              135
              136
              137
              @abstractmethod
              async def get_stats(self) -> MessageStats | AggregatedMessageStats:
                  """Get message statistics for this node."""
              

              pre_run async

              pre_run(
                  *prompt: PromptCompatible | ChatMessage,
              ) -> tuple[ChatMessage[Any], list[BaseContent | str]]
              

              Hook to prepare a MessgeNode run call.

              Parameters:

              Name Type Description Default
              *prompt PromptCompatible | ChatMessage

              The prompt(s) to prepare.

              ()

              Returns:

              Type Description
              tuple[ChatMessage[Any], list[BaseContent | str]]

              A tuple of: - Either incoming message, or a constructed incoming message based on the prompt(s). - A list of prompts to be sent to the model.

              Source code in src/llmling_agent/messaging/messagenode.py
              25
              26
              27
              28
              29
              30
              31
              32
              33
              34
              35
              36
              37
              38
              39
              40
              41
              42
              43
              44
              45
              46
              47
              48
              49
              50
              51
              52
              53
              54
              55
              async def pre_run(
                  self,
                  *prompt: PromptCompatible | ChatMessage,
              ) -> tuple[ChatMessage[Any], list[BaseContent | str]]:
                  """Hook to prepare a MessgeNode run call.
              
                  Args:
                      *prompt: The prompt(s) to prepare.
              
                  Returns:
                      A tuple of:
                          - Either incoming message, or a constructed incoming message based
                            on the prompt(s).
                          - A list of prompts to be sent to the model.
                  """
                  if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                      user_msg = prompt[0]
                      prompts = await convert_prompts([user_msg.content])
                      # Update received message's chain to show it came through its source
                      user_msg = user_msg.forwarded(prompt[0]).to_request()
                      # clear cost info to avoid double-counting
                      final_prompt = "\n\n".join(str(p) for p in prompts)
                  else:
                      prompts = await convert_prompts(prompt)
                      final_prompt = "\n\n".join(str(p) for p in prompts)
                      # use format_prompts?
                      messages = [i if isinstance(i, str) else i.to_pydantic_ai() for i in prompts]
                      user_msg = ChatMessage.user_prompt(message=messages)
                  self.message_received.emit(user_msg)
                  self.context.current_prompt = final_prompt
                  return user_msg, prompts
              

              run async

              run(
                  *prompt: PromptCompatible | ChatMessage,
                  wait_for_connections: bool | None = None,
                  store_history: bool = True,
                  output_type: type[OutputTypeT] | None = None,
                  **kwargs: Any,
              ) -> ChatMessage[Any]
              

              Execute node with prompts and handle message routing.

              Parameters:

              Name Type Description Default
              prompt PromptCompatible | ChatMessage

              Input prompts

              ()
              wait_for_connections bool | None

              Whether to wait for forwarded messages

              None
              store_history bool

              Whether to store in conversation history

              True
              output_type type[OutputTypeT] | None

              Type of output to expect

              None
              **kwargs Any

              Additional arguments for _run

              {}
              Source code in src/llmling_agent/messaging/messagenode.py
               96
               97
               98
               99
              100
              101
              102
              103
              104
              105
              106
              107
              108
              109
              110
              111
              112
              113
              114
              115
              116
              117
              118
              119
              120
              121
              122
              123
              124
              125
              126
              127
              128
              129
              130
              131
              132
              133
              @method_spawner
              async def run[OutputTypeT](
                  self,
                  *prompt: PromptCompatible | ChatMessage,
                  wait_for_connections: bool | None = None,
                  store_history: bool = True,
                  output_type: type[OutputTypeT] | None = None,
                  **kwargs: Any,
              ) -> ChatMessage[Any]:
                  """Execute node with prompts and handle message routing.
              
                  Args:
                      prompt: Input prompts
                      wait_for_connections: Whether to wait for forwarded messages
                      store_history: Whether to store in conversation history
                      output_type: Type of output to expect
                      **kwargs: Additional arguments for _run
                  """
                  from llmling_agent import Agent
              
                  user_msg, prompts = await self.pre_run(*prompt)
                  message = await self._run(
                      *prompts,
                      store_history=store_history,
                      conversation_id=user_msg.conversation_id,
                      output_type=output_type,
                      **kwargs,
                  )
              
                  # For chain processing, update the response's chain
                  if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                      message = message.forwarded(prompt[0])
              
                  if store_history and isinstance(self, Agent):
                      self.conversation.add_chat_messages([user_msg, message])
                  self.message_sent.emit(message)
                  await self.connections.route_message(message, wait=wait_for_connections)
                  return message
              

              run_iter abstractmethod

              run_iter(*prompts: Any, **kwargs: Any) -> AsyncIterator[ChatMessage[Any]]
              

              Yield messages during execution.

              Source code in src/llmling_agent/messaging/messagenode.py
              139
              140
              141
              142
              143
              144
              145
              @abstractmethod
              def run_iter(
                  self,
                  *prompts: Any,
                  **kwargs: Any,
              ) -> AsyncIterator[ChatMessage[Any]]:
                  """Yield messages during execution."""
              

              TeamResponse

              Bases: list[AgentResponse[Any]]

              Results from a team execution.

              Source code in src/llmling_agent/messaging/messages.py
              606
              607
              608
              609
              610
              611
              612
              613
              614
              615
              616
              617
              618
              619
              620
              621
              622
              623
              624
              625
              626
              627
              628
              629
              630
              631
              632
              633
              634
              635
              636
              637
              638
              639
              640
              641
              642
              643
              644
              645
              646
              647
              648
              649
              650
              651
              652
              653
              654
              655
              656
              657
              658
              659
              class TeamResponse[TMessageContent](list[AgentResponse[Any]]):
                  """Results from a team execution."""
              
                  def __init__(
                      self,
                      responses: list[AgentResponse[TMessageContent]],
                      start_time: datetime | None = None,
                      errors: dict[str, Exception] | None = None,
                  ):
                      super().__init__(responses)
                      self.start_time = start_time or get_now()
                      self.end_time = get_now()
                      self.errors = errors or {}
              
                  @property
                  def duration(self) -> float:
                      """Get execution duration in seconds."""
                      return (self.end_time - self.start_time).total_seconds()
              
                  @property
                  def success(self) -> bool:
                      """Whether all agents completed successfully."""
                      return not bool(self.errors)
              
                  @property
                  def failed_agents(self) -> list[str]:
                      """Names of agents that failed."""
                      return list(self.errors.keys())
              
                  def by_agent(self, name: str) -> AgentResponse[TMessageContent] | None:
                      """Get response from specific agent."""
                      return next((r for r in self if r.agent_name == name), None)
              
                  def format_durations(self) -> str:
                      """Format execution times."""
                      parts = [f"{r.agent_name}: {r.timing:.2f}s" for r in self if r.timing is not None]
                      return f"Individual times: {', '.join(parts)}\nTotal time: {self.duration:.2f}s"
              
                  # TODO: could keep TResultContent for len(messages) == 1
                  def to_chat_message(self) -> ChatMessage[str]:
                      """Convert team response to a single chat message."""
                      # Combine all responses into one structured message
                      content = "\n\n".join(
                          f"[{response.agent_name}]: {response.message.content}"
                          for response in self
                          if response.message
                      )
                      meta = {
                          "type": "team_response",
                          "agents": [r.agent_name for r in self],
                          "duration": self.duration,
                          "success_count": len(self),
                      }
                      return ChatMessage(content=content, role="assistant", metadata=meta)  # type: ignore
              

              duration property

              duration: float
              

              Get execution duration in seconds.

              failed_agents property

              failed_agents: list[str]
              

              Names of agents that failed.

              success property

              success: bool
              

              Whether all agents completed successfully.

              by_agent

              by_agent(name: str) -> AgentResponse[TMessageContent] | None
              

              Get response from specific agent.

              Source code in src/llmling_agent/messaging/messages.py
              635
              636
              637
              def by_agent(self, name: str) -> AgentResponse[TMessageContent] | None:
                  """Get response from specific agent."""
                  return next((r for r in self if r.agent_name == name), None)
              

              format_durations

              format_durations() -> str
              

              Format execution times.

              Source code in src/llmling_agent/messaging/messages.py
              639
              640
              641
              642
              def format_durations(self) -> str:
                  """Format execution times."""
                  parts = [f"{r.agent_name}: {r.timing:.2f}s" for r in self if r.timing is not None]
                  return f"Individual times: {', '.join(parts)}\nTotal time: {self.duration:.2f}s"
              

              to_chat_message

              to_chat_message() -> ChatMessage[str]
              

              Convert team response to a single chat message.

              Source code in src/llmling_agent/messaging/messages.py
              645
              646
              647
              648
              649
              650
              651
              652
              653
              654
              655
              656
              657
              658
              659
              def to_chat_message(self) -> ChatMessage[str]:
                  """Convert team response to a single chat message."""
                  # Combine all responses into one structured message
                  content = "\n\n".join(
                      f"[{response.agent_name}]: {response.message.content}"
                      for response in self
                      if response.message
                  )
                  meta = {
                      "type": "team_response",
                      "agents": [r.agent_name for r in self],
                      "duration": self.duration,
                      "success_count": len(self),
                  }
                  return ChatMessage(content=content, role="assistant", metadata=meta)  # type: ignore
              

              TokenCost dataclass

              Combined token and cost tracking.

              Source code in src/llmling_agent/messaging/messages.py
              114
              115
              116
              117
              118
              119
              120
              121
              122
              123
              124
              125
              126
              127
              128
              129
              130
              131
              132
              133
              134
              135
              136
              137
              138
              139
              140
              141
              142
              143
              144
              145
              146
              147
              148
              149
              150
              151
              152
              153
              154
              155
              156
              @dataclass(frozen=True)
              class TokenCost:
                  """Combined token and cost tracking."""
              
                  token_usage: RunUsage
                  """Token counts for prompt and completion"""
                  total_cost: Decimal
                  """Total cost in USD"""
              
                  @classmethod
                  async def from_usage(cls, usage: RunUsage, model: str) -> TokenCost | None:
                      """Create result from usage data.
              
                      Args:
                          usage: Token counts from model response
                          model: Name of the model used
              
              
                      Returns:
                          TokenCost if usage data available, None otherwise
                      """
                      logger.debug("Token usage", usage=usage)
                      # return cls(token_usage=token_usage, total_cost=Decimal(total_cost))
                      if model in {"None", "test"}:
                          price = Decimal(0)
                      else:
                          parts = model.split(":", 1)
                          try:
                              price_data = calc_price(
                                  usage,
                                  model_ref=parts[1] if len(parts) > 1 else parts[0],
                                  provider_id=parts[0] if len(parts) > 1 else "openai",
                              )
                              price = price_data.total_price
                          except Exception:  # noqa: BLE001
                              cost = await tokonomics.calculate_token_cost(
                                  model,
                                  usage.input_tokens,
                                  usage.output_tokens,
                              )
                              price = Decimal(cost.total_cost if cost else 0)
              
                      return cls(token_usage=usage, total_cost=price)
              

              token_usage instance-attribute

              token_usage: RunUsage
              

              Token counts for prompt and completion

              total_cost instance-attribute

              total_cost: Decimal
              

              Total cost in USD

              from_usage async classmethod

              from_usage(usage: RunUsage, model: str) -> TokenCost | None
              

              Create result from usage data.

              Parameters:

              Name Type Description Default
              usage RunUsage

              Token counts from model response

              required
              model str

              Name of the model used

              required

              Returns:

              Type Description
              TokenCost | None

              TokenCost if usage data available, None otherwise

              Source code in src/llmling_agent/messaging/messages.py
              123
              124
              125
              126
              127
              128
              129
              130
              131
              132
              133
              134
              135
              136
              137
              138
              139
              140
              141
              142
              143
              144
              145
              146
              147
              148
              149
              150
              151
              152
              153
              154
              155
              156
              @classmethod
              async def from_usage(cls, usage: RunUsage, model: str) -> TokenCost | None:
                  """Create result from usage data.
              
                  Args:
                      usage: Token counts from model response
                      model: Name of the model used
              
              
                  Returns:
                      TokenCost if usage data available, None otherwise
                  """
                  logger.debug("Token usage", usage=usage)
                  # return cls(token_usage=token_usage, total_cost=Decimal(total_cost))
                  if model in {"None", "test"}:
                      price = Decimal(0)
                  else:
                      parts = model.split(":", 1)
                      try:
                          price_data = calc_price(
                              usage,
                              model_ref=parts[1] if len(parts) > 1 else parts[0],
                              provider_id=parts[0] if len(parts) > 1 else "openai",
                          )
                          price = price_data.total_price
                      except Exception:  # noqa: BLE001
                          cost = await tokonomics.calculate_token_cost(
                              model,
                              usage.input_tokens,
                              usage.output_tokens,
                          )
                          price = Decimal(cost.total_cost if cost else 0)
              
                  return cls(token_usage=usage, total_cost=price)