Skip to content

messaging

Class info

Classes

Name Children Inherits
AgentResponse
llmling_agent.messaging.messages
Result from an agent's execution.
    ChatMessage
    llmling_agent.messaging.messages
    Common message format for all UI types.
      ChatMessageContainer
      llmling_agent.messaging.message_container
      Container for tracking and managing chat messages.
        EventManager
        llmling_agent.messaging.event_manager
        Manages multiple event sources and their lifecycles.
          MessageNode
          llmling_agent.messaging.messagenode
          Base class for all message processing nodes.
          TeamResponse
          llmling_agent.messaging.messages
          Results from a team execution.
            TokenCost
            llmling_agent.messaging.messages
            Combined token and cost tracking.

              🛈 DocStrings

              Core messsaging classes for LLMling agent.

              AgentResponse dataclass

              Result from an agent's execution.

              Source code in src/llmling_agent/messaging/messages.py
              419
              420
              421
              422
              423
              424
              425
              426
              427
              428
              429
              430
              431
              432
              433
              434
              435
              436
              437
              438
              439
              440
              441
              442
              443
              @dataclass
              class AgentResponse[TResult]:
                  """Result from an agent's execution."""
              
                  agent_name: str
                  """Name of the agent that produced this result"""
              
                  message: ChatMessage[TResult] | None
                  """The actual message with content and metadata"""
              
                  timing: float | None = None
                  """Time taken by this agent in seconds"""
              
                  error: str | None = None
                  """Error message if agent failed"""
              
                  @property
                  def success(self) -> bool:
                      """Whether the agent completed successfully."""
                      return self.error is None
              
                  @property
                  def response(self) -> TResult | None:
                      """Convenient access to message content."""
                      return self.message.content if self.message else None
              

              agent_name instance-attribute

              agent_name: str
              

              Name of the agent that produced this result

              error class-attribute instance-attribute

              error: str | None = None
              

              Error message if agent failed

              message instance-attribute

              message: ChatMessage[TResult] | None
              

              The actual message with content and metadata

              response property

              response: TResult | None
              

              Convenient access to message content.

              success property

              success: bool
              

              Whether the agent completed successfully.

              timing class-attribute instance-attribute

              timing: float | None = None
              

              Time taken by this agent in seconds

              ChatMessage dataclass

              Common message format for all UI types.

              Generically typed with: ChatMessage[Type of Content] The type can either be str or a BaseModel subclass.

              Source code in src/llmling_agent/messaging/messages.py
              151
              152
              153
              154
              155
              156
              157
              158
              159
              160
              161
              162
              163
              164
              165
              166
              167
              168
              169
              170
              171
              172
              173
              174
              175
              176
              177
              178
              179
              180
              181
              182
              183
              184
              185
              186
              187
              188
              189
              190
              191
              192
              193
              194
              195
              196
              197
              198
              199
              200
              201
              202
              203
              204
              205
              206
              207
              208
              209
              210
              211
              212
              213
              214
              215
              216
              217
              218
              219
              220
              221
              222
              223
              224
              225
              226
              227
              228
              229
              230
              231
              232
              233
              234
              235
              236
              237
              238
              239
              240
              241
              242
              243
              244
              245
              246
              247
              248
              249
              250
              251
              252
              253
              254
              255
              256
              257
              258
              259
              260
              261
              262
              263
              264
              265
              266
              267
              268
              269
              270
              271
              272
              273
              274
              275
              276
              277
              278
              279
              280
              281
              282
              283
              284
              285
              286
              287
              288
              289
              290
              291
              292
              293
              294
              295
              296
              297
              298
              299
              300
              301
              302
              303
              304
              305
              306
              307
              308
              309
              310
              311
              312
              313
              314
              315
              316
              317
              318
              319
              320
              321
              322
              323
              324
              325
              326
              327
              328
              329
              330
              331
              332
              333
              334
              335
              336
              337
              338
              339
              340
              341
              342
              343
              344
              345
              346
              347
              348
              349
              350
              351
              352
              353
              354
              355
              356
              357
              358
              359
              360
              361
              362
              363
              364
              365
              366
              367
              368
              369
              370
              371
              372
              373
              374
              375
              376
              377
              378
              379
              380
              381
              382
              383
              384
              385
              386
              387
              388
              389
              390
              391
              392
              393
              394
              395
              396
              397
              398
              399
              400
              401
              402
              403
              404
              405
              406
              407
              408
              409
              410
              411
              412
              413
              414
              415
              416
              @dataclass
              class ChatMessage[TContent]:
                  """Common message format for all UI types.
              
                  Generically typed with: ChatMessage[Type of Content]
                  The type can either be str or a BaseModel subclass.
                  """
              
                  content: TContent
                  """Message content, typed as TContent (either str or BaseModel)."""
              
                  role: MessageRole
                  """Role of the message sender (user/assistant)."""
              
                  metadata: SimpleJsonType = field(default_factory=dict)
                  """Additional metadata about the message."""
              
                  timestamp: datetime = field(default_factory=get_now)
                  """When this message was created."""
              
                  cost_info: TokenCost | None = None
                  """Token usage and costs for this specific message if available."""
              
                  message_id: str = field(default_factory=lambda: str(uuid4()))
                  """Unique identifier for this message."""
              
                  conversation_id: str | None = None
                  """ID of the conversation this message belongs to."""
              
                  response_time: float | None = None
                  """Time it took the LLM to respond."""
              
                  tool_calls: list[ToolCallInfo] = field(default_factory=list)
                  """List of tool calls made during message generation."""
              
                  associated_messages: list[ChatMessage[Any]] = field(default_factory=list)
                  """List of messages which were generated during the the creation of this messsage."""
              
                  name: str | None = None
                  """Display name for the message sender in UI."""
              
                  forwarded_from: list[str] = field(default_factory=list)
                  """List of agent names (the chain) that forwarded this message to the sender."""
              
                  provider_details: dict[str, Any] = field(default_factory=dict)
                  """Provider specific metadata / extra information."""
              
                  parts: Sequence[ModelResponsePart | ModelRequestPart] = field(default_factory=list)
                  """The parts of the model message."""
              
                  usage: RequestUsage = field(default_factory=RequestUsage)
                  """Usage information for the request.
              
                  This has a default to make tests easier,
                  and to support loading old messages where usage will be missing.
                  """
              
                  model_name: str | None = None
                  """The name of the model that generated the response."""
              
                  provider_name: str | None = None
                  """The name of the LLM provider that generated the response."""
              
                  provider_response_id: str | None = None
                  """request ID as specified by the model provider.
              
                  This can be used to track the specific request to the model."""
              
                  finish_reason: FinishReason | None = None
                  """Reason the model finished generating the response.
              
                  Normalized to OpenTelemetry values."""
              
                  @property
                  def kind(self) -> Literal["request", "response"]:
                      """Role of the message."""
                      match self.role:
                          case "assistant":
                              return "response"
                          case "user":
                              return "request"
              
                  def to_pydantic_ai(self) -> ModelRequest | ModelResponse:
                      """Convert this message to a Pydantic model."""
                      match self.kind:
                          case "request":
                              return ModelRequest(parts=self.parts, instructions=None)  # type: ignore
                          case "response":
                              return ModelResponse(
                                  parts=self.parts,  # type: ignore
                                  usage=self.usage,
                                  model_name=self.model_name,
                                  timestamp=self.timestamp,
                                  provider_name=self.provider_name,
                                  provider_details=self.provider_details,
                                  finish_reason=self.finish_reason,
                                  provider_response_id=self.provider_response_id,
                              )
              
                  @classmethod
                  def from_pydantic_ai[TContentType](
                      cls,
                      content: TContentType,
                      message: ModelRequest | ModelResponse,
                      conversation_id: str | None = None,
                      name: str | None = None,
                      message_id: str | None = None,
                      forwarded_from: list[str] | None = None,
                  ) -> ChatMessage[TContentType]:
                      """Convert a Pydantic model to a ChatMessage."""
                      match message:
                          case ModelRequest(parts=parts, instructions=_instructions):
                              return ChatMessage(
                                  parts=parts,
                                  content=content,
                                  role="user" if message.kind == "request" else "assistant",
                                  message_id=message_id or str(uuid.uuid4()),
                                  # instructions=instructions,
                                  forwarded_from=forwarded_from or [],
                                  name=name,
                              )
                          case ModelResponse(
                              parts=parts,
                              usage=usage,
                              model_name=model_name,
                              timestamp=timestamp,
                              provider_name=provider_name,
                              provider_details=provider_details,
                              finish_reason=finish_reason,
                              provider_response_id=provider_response_id,
                          ):
                              return ChatMessage(
                                  role="user" if message.kind == "request" else "assistant",
                                  content=content,
                                  parts=parts,
                                  usage=usage,
                                  message_id=message_id or str(uuid.uuid4()),
                                  conversation_id=conversation_id,
                                  model_name=model_name,
                                  timestamp=timestamp,
                                  provider_name=provider_name,
                                  provider_details=provider_details or {},
                                  finish_reason=finish_reason,
                                  provider_response_id=provider_response_id,
                                  name=name,
                                  forwarded_from=forwarded_from or [],
                              )
                          case _:
                              msg = f"Unknown message kind: {message.kind}"
                              raise ValueError(msg)
              
                  def forwarded(self, previous_message: ChatMessage[Any]) -> Self:
                      """Create new message showing it was forwarded from another message.
              
                      Args:
                          previous_message: The message that led to this one's creation
              
                      Returns:
                          New message with updated chain showing the path through previous message
                      """
                      from_ = [*previous_message.forwarded_from, previous_message.name or "unknown"]
                      return replace(self, forwarded_from=from_)
              
                  def to_text_message(self) -> ChatMessage[str]:
                      """Convert this message to a text-only version."""
                      return dataclasses.replace(self, content=str(self.content))  # type: ignore
              
                  def to_request(self) -> Self:
                      """Convert this message to a request message.
              
                      If the message is already a request (user role), this is a no-op.
                      If it's a response (assistant role), converts response parts to user content.
              
                      Returns:
                          New ChatMessage with role='user' and converted parts
                      """
                      if self.role == "user":
                          # Already a request, return as-is
                          return self
              
                      # Convert response parts to user content
                      converted_parts: list[Any] = []
                      user_content: list[UserContent] = []
              
                      for part in self.parts:
                          match part:
                              case TextPart(content=text_content):
                                  # Text parts become user content strings
                                  user_content.append(text_content)
                              case FilePart(content=binary_content):
                                  # File parts (images, etc.) become user content directly
                                  user_content.append(binary_content)
                              case _:
                                  # Other parts (tool calls, etc.) are kept as-is for now
                                  # Could be extended to handle more conversion cases
                                  pass
              
                      # Create new UserPromptPart with converted content
                      if user_content:
                          if len(user_content) == 1 and isinstance(user_content[0], str):
                              # Single string content
                              converted_parts = [UserPromptPart(content=user_content[0])]
                          else:
                              # Multi-modal content
                              converted_parts = [UserPromptPart(content=user_content)]
                      else:
                          # Fallback to text representation if no convertible parts
                          converted_parts = [UserPromptPart(content=str(self.content))]
              
                      return replace(self, role="user", parts=converted_parts, cost_info=None)
              
                  @property
                  def data(self) -> TContent:
                      """Get content as typed data. Provides compat to AgentRunResult."""
                      return self.content
              
                  def format(
                      self,
                      style: FormatStyle = "simple",
                      *,
                      template: str | None = None,
                      variables: dict[str, Any] | None = None,
                      show_metadata: bool = False,
                      show_costs: bool = False,
                  ) -> str:
                      """Format message with configurable style.
              
                      Args:
                          style: Predefined style or "custom" for custom template
                          template: Custom Jinja template (required if style="custom")
                          variables: Additional variables for template rendering
                          show_metadata: Whether to include metadata
                          show_costs: Whether to include cost information
              
                      Raises:
                          ValueError: If style is "custom" but no template provided
                                  or if style is invalid
                      """
                      from jinjarope import Environment
                      import yamling
              
                      env = Environment(trim_blocks=True, lstrip_blocks=True)
                      env.filters["to_yaml"] = yamling.dump_yaml
              
                      match style:
                          case "custom":
                              if not template:
                                  msg = "Custom style requires a template"
                                  raise ValueError(msg)
                              template_str = template
                          case _ if style in MESSAGE_TEMPLATES:
                              template_str = MESSAGE_TEMPLATES[style]
                          case _:
                              msg = f"Invalid style: {style}"
                              raise ValueError(msg)
                      template_obj = env.from_string(template_str)
                      vars_ = {
                          **(self.__dict__),
                          "show_metadata": show_metadata,
                          "show_costs": show_costs,
                      }
                      print(vars_)
                      if variables:
                          vars_.update(variables)
              
                      return template_obj.render(**vars_)
              

              associated_messages class-attribute instance-attribute

              associated_messages: list[ChatMessage[Any]] = field(default_factory=list)
              

              List of messages which were generated during the the creation of this messsage.

              content instance-attribute

              content: TContent
              

              Message content, typed as TContent (either str or BaseModel).

              conversation_id class-attribute instance-attribute

              conversation_id: str | None = None
              

              ID of the conversation this message belongs to.

              cost_info class-attribute instance-attribute

              cost_info: TokenCost | None = None
              

              Token usage and costs for this specific message if available.

              data property

              data: TContent
              

              Get content as typed data. Provides compat to AgentRunResult.

              finish_reason class-attribute instance-attribute

              finish_reason: FinishReason | None = None
              

              Reason the model finished generating the response.

              Normalized to OpenTelemetry values.

              forwarded_from class-attribute instance-attribute

              forwarded_from: list[str] = field(default_factory=list)
              

              List of agent names (the chain) that forwarded this message to the sender.

              kind property

              kind: Literal['request', 'response']
              

              Role of the message.

              message_id class-attribute instance-attribute

              message_id: str = field(default_factory=lambda: str(uuid4()))
              

              Unique identifier for this message.

              metadata class-attribute instance-attribute

              metadata: SimpleJsonType = field(default_factory=dict)
              

              Additional metadata about the message.

              model_name class-attribute instance-attribute

              model_name: str | None = None
              

              The name of the model that generated the response.

              name class-attribute instance-attribute

              name: str | None = None
              

              Display name for the message sender in UI.

              parts class-attribute instance-attribute

              parts: Sequence[ModelResponsePart | ModelRequestPart] = field(default_factory=list)
              

              The parts of the model message.

              provider_details class-attribute instance-attribute

              provider_details: dict[str, Any] = field(default_factory=dict)
              

              Provider specific metadata / extra information.

              provider_name class-attribute instance-attribute

              provider_name: str | None = None
              

              The name of the LLM provider that generated the response.

              provider_response_id class-attribute instance-attribute

              provider_response_id: str | None = None
              

              request ID as specified by the model provider.

              This can be used to track the specific request to the model.

              response_time class-attribute instance-attribute

              response_time: float | None = None
              

              Time it took the LLM to respond.

              role instance-attribute

              role: MessageRole
              

              Role of the message sender (user/assistant).

              timestamp class-attribute instance-attribute

              timestamp: datetime = field(default_factory=get_now)
              

              When this message was created.

              tool_calls class-attribute instance-attribute

              tool_calls: list[ToolCallInfo] = field(default_factory=list)
              

              List of tool calls made during message generation.

              usage class-attribute instance-attribute

              usage: RequestUsage = field(default_factory=RequestUsage)
              

              Usage information for the request.

              This has a default to make tests easier, and to support loading old messages where usage will be missing.

              format

              format(
                  style: FormatStyle = "simple",
                  *,
                  template: str | None = None,
                  variables: dict[str, Any] | None = None,
                  show_metadata: bool = False,
                  show_costs: bool = False,
              ) -> str
              

              Format message with configurable style.

              Parameters:

              Name Type Description Default
              style FormatStyle

              Predefined style or "custom" for custom template

              'simple'
              template str | None

              Custom Jinja template (required if style="custom")

              None
              variables dict[str, Any] | None

              Additional variables for template rendering

              None
              show_metadata bool

              Whether to include metadata

              False
              show_costs bool

              Whether to include cost information

              False

              Raises:

              Type Description
              ValueError

              If style is "custom" but no template provided or if style is invalid

              Source code in src/llmling_agent/messaging/messages.py
              367
              368
              369
              370
              371
              372
              373
              374
              375
              376
              377
              378
              379
              380
              381
              382
              383
              384
              385
              386
              387
              388
              389
              390
              391
              392
              393
              394
              395
              396
              397
              398
              399
              400
              401
              402
              403
              404
              405
              406
              407
              408
              409
              410
              411
              412
              413
              414
              415
              416
              def format(
                  self,
                  style: FormatStyle = "simple",
                  *,
                  template: str | None = None,
                  variables: dict[str, Any] | None = None,
                  show_metadata: bool = False,
                  show_costs: bool = False,
              ) -> str:
                  """Format message with configurable style.
              
                  Args:
                      style: Predefined style or "custom" for custom template
                      template: Custom Jinja template (required if style="custom")
                      variables: Additional variables for template rendering
                      show_metadata: Whether to include metadata
                      show_costs: Whether to include cost information
              
                  Raises:
                      ValueError: If style is "custom" but no template provided
                              or if style is invalid
                  """
                  from jinjarope import Environment
                  import yamling
              
                  env = Environment(trim_blocks=True, lstrip_blocks=True)
                  env.filters["to_yaml"] = yamling.dump_yaml
              
                  match style:
                      case "custom":
                          if not template:
                              msg = "Custom style requires a template"
                              raise ValueError(msg)
                          template_str = template
                      case _ if style in MESSAGE_TEMPLATES:
                          template_str = MESSAGE_TEMPLATES[style]
                      case _:
                          msg = f"Invalid style: {style}"
                          raise ValueError(msg)
                  template_obj = env.from_string(template_str)
                  vars_ = {
                      **(self.__dict__),
                      "show_metadata": show_metadata,
                      "show_costs": show_costs,
                  }
                  print(vars_)
                  if variables:
                      vars_.update(variables)
              
                  return template_obj.render(**vars_)
              

              forwarded

              forwarded(previous_message: ChatMessage[Any]) -> Self
              

              Create new message showing it was forwarded from another message.

              Parameters:

              Name Type Description Default
              previous_message ChatMessage[Any]

              The message that led to this one's creation

              required

              Returns:

              Type Description
              Self

              New message with updated chain showing the path through previous message

              Source code in src/llmling_agent/messaging/messages.py
              302
              303
              304
              305
              306
              307
              308
              309
              310
              311
              312
              def forwarded(self, previous_message: ChatMessage[Any]) -> Self:
                  """Create new message showing it was forwarded from another message.
              
                  Args:
                      previous_message: The message that led to this one's creation
              
                  Returns:
                      New message with updated chain showing the path through previous message
                  """
                  from_ = [*previous_message.forwarded_from, previous_message.name or "unknown"]
                  return replace(self, forwarded_from=from_)
              

              from_pydantic_ai classmethod

              from_pydantic_ai(
                  content: TContentType,
                  message: ModelRequest | ModelResponse,
                  conversation_id: str | None = None,
                  name: str | None = None,
                  message_id: str | None = None,
                  forwarded_from: list[str] | None = None,
              ) -> ChatMessage[TContentType]
              

              Convert a Pydantic model to a ChatMessage.

              Source code in src/llmling_agent/messaging/messages.py
              250
              251
              252
              253
              254
              255
              256
              257
              258
              259
              260
              261
              262
              263
              264
              265
              266
              267
              268
              269
              270
              271
              272
              273
              274
              275
              276
              277
              278
              279
              280
              281
              282
              283
              284
              285
              286
              287
              288
              289
              290
              291
              292
              293
              294
              295
              296
              297
              298
              299
              300
              @classmethod
              def from_pydantic_ai[TContentType](
                  cls,
                  content: TContentType,
                  message: ModelRequest | ModelResponse,
                  conversation_id: str | None = None,
                  name: str | None = None,
                  message_id: str | None = None,
                  forwarded_from: list[str] | None = None,
              ) -> ChatMessage[TContentType]:
                  """Convert a Pydantic model to a ChatMessage."""
                  match message:
                      case ModelRequest(parts=parts, instructions=_instructions):
                          return ChatMessage(
                              parts=parts,
                              content=content,
                              role="user" if message.kind == "request" else "assistant",
                              message_id=message_id or str(uuid.uuid4()),
                              # instructions=instructions,
                              forwarded_from=forwarded_from or [],
                              name=name,
                          )
                      case ModelResponse(
                          parts=parts,
                          usage=usage,
                          model_name=model_name,
                          timestamp=timestamp,
                          provider_name=provider_name,
                          provider_details=provider_details,
                          finish_reason=finish_reason,
                          provider_response_id=provider_response_id,
                      ):
                          return ChatMessage(
                              role="user" if message.kind == "request" else "assistant",
                              content=content,
                              parts=parts,
                              usage=usage,
                              message_id=message_id or str(uuid.uuid4()),
                              conversation_id=conversation_id,
                              model_name=model_name,
                              timestamp=timestamp,
                              provider_name=provider_name,
                              provider_details=provider_details or {},
                              finish_reason=finish_reason,
                              provider_response_id=provider_response_id,
                              name=name,
                              forwarded_from=forwarded_from or [],
                          )
                      case _:
                          msg = f"Unknown message kind: {message.kind}"
                          raise ValueError(msg)
              

              to_pydantic_ai

              to_pydantic_ai() -> ModelRequest | ModelResponse
              

              Convert this message to a Pydantic model.

              Source code in src/llmling_agent/messaging/messages.py
              233
              234
              235
              236
              237
              238
              239
              240
              241
              242
              243
              244
              245
              246
              247
              248
              def to_pydantic_ai(self) -> ModelRequest | ModelResponse:
                  """Convert this message to a Pydantic model."""
                  match self.kind:
                      case "request":
                          return ModelRequest(parts=self.parts, instructions=None)  # type: ignore
                      case "response":
                          return ModelResponse(
                              parts=self.parts,  # type: ignore
                              usage=self.usage,
                              model_name=self.model_name,
                              timestamp=self.timestamp,
                              provider_name=self.provider_name,
                              provider_details=self.provider_details,
                              finish_reason=self.finish_reason,
                              provider_response_id=self.provider_response_id,
                          )
              

              to_request

              to_request() -> Self
              

              Convert this message to a request message.

              If the message is already a request (user role), this is a no-op. If it's a response (assistant role), converts response parts to user content.

              Returns:

              Type Description
              Self

              New ChatMessage with role='user' and converted parts

              Source code in src/llmling_agent/messaging/messages.py
              318
              319
              320
              321
              322
              323
              324
              325
              326
              327
              328
              329
              330
              331
              332
              333
              334
              335
              336
              337
              338
              339
              340
              341
              342
              343
              344
              345
              346
              347
              348
              349
              350
              351
              352
              353
              354
              355
              356
              357
              358
              359
              360
              def to_request(self) -> Self:
                  """Convert this message to a request message.
              
                  If the message is already a request (user role), this is a no-op.
                  If it's a response (assistant role), converts response parts to user content.
              
                  Returns:
                      New ChatMessage with role='user' and converted parts
                  """
                  if self.role == "user":
                      # Already a request, return as-is
                      return self
              
                  # Convert response parts to user content
                  converted_parts: list[Any] = []
                  user_content: list[UserContent] = []
              
                  for part in self.parts:
                      match part:
                          case TextPart(content=text_content):
                              # Text parts become user content strings
                              user_content.append(text_content)
                          case FilePart(content=binary_content):
                              # File parts (images, etc.) become user content directly
                              user_content.append(binary_content)
                          case _:
                              # Other parts (tool calls, etc.) are kept as-is for now
                              # Could be extended to handle more conversion cases
                              pass
              
                  # Create new UserPromptPart with converted content
                  if user_content:
                      if len(user_content) == 1 and isinstance(user_content[0], str):
                          # Single string content
                          converted_parts = [UserPromptPart(content=user_content[0])]
                      else:
                          # Multi-modal content
                          converted_parts = [UserPromptPart(content=user_content)]
                  else:
                      # Fallback to text representation if no convertible parts
                      converted_parts = [UserPromptPart(content=str(self.content))]
              
                  return replace(self, role="user", parts=converted_parts, cost_info=None)
              

              to_text_message

              to_text_message() -> ChatMessage[str]
              

              Convert this message to a text-only version.

              Source code in src/llmling_agent/messaging/messages.py
              314
              315
              316
              def to_text_message(self) -> ChatMessage[str]:
                  """Convert this message to a text-only version."""
                  return dataclasses.replace(self, content=str(self.content))  # type: ignore
              

              ChatMessageContainer

              Bases: EventedList[ChatMessage[Any]]

              Container for tracking and managing chat messages.

              Extends EventedList to provide: - Message statistics (tokens, costs) - History formatting - Token-aware context window management - Role-based filtering

              Source code in src/llmling_agent/messaging/message_container.py
               28
               29
               30
               31
               32
               33
               34
               35
               36
               37
               38
               39
               40
               41
               42
               43
               44
               45
               46
               47
               48
               49
               50
               51
               52
               53
               54
               55
               56
               57
               58
               59
               60
               61
               62
               63
               64
               65
               66
               67
               68
               69
               70
               71
               72
               73
               74
               75
               76
               77
               78
               79
               80
               81
               82
               83
               84
               85
               86
               87
               88
               89
               90
               91
               92
               93
               94
               95
               96
               97
               98
               99
              100
              101
              102
              103
              104
              105
              106
              107
              108
              109
              110
              111
              112
              113
              114
              115
              116
              117
              118
              119
              120
              121
              122
              123
              124
              125
              126
              127
              128
              129
              130
              131
              132
              133
              134
              135
              136
              137
              138
              139
              140
              141
              142
              143
              144
              145
              146
              147
              148
              149
              150
              151
              152
              153
              154
              155
              156
              157
              158
              159
              160
              161
              162
              163
              164
              165
              166
              167
              168
              169
              170
              171
              172
              173
              174
              175
              176
              177
              178
              179
              180
              181
              182
              183
              184
              185
              186
              187
              188
              189
              190
              191
              192
              193
              194
              195
              196
              197
              198
              199
              200
              201
              202
              203
              204
              205
              206
              207
              208
              209
              210
              211
              212
              213
              214
              215
              216
              class ChatMessageContainer(EventedList[ChatMessage[Any]]):
                  """Container for tracking and managing chat messages.
              
                  Extends EventedList to provide:
                  - Message statistics (tokens, costs)
                  - History formatting
                  - Token-aware context window management
                  - Role-based filtering
                  """
              
                  def get_message_tokens(self, message: ChatMessage[Any]) -> int:
                      """Get token count for a single message.
              
                      Uses cost_info if available, falls back to tiktoken estimation.
              
                      Args:
                          message: Message to count tokens for
              
                      Returns:
                          Token count for the message
                      """
                      if message.cost_info:
                          return message.cost_info.token_usage.total_tokens
                      return count_tokens(str(message.usage.total_tokens), message.model_name)
              
                  def get_history_tokens(self, fallback_model: str | None = None) -> int:
                      """Get total token count for all messages.
              
                      Uses cost_info when available, falls back to tiktoken estimation
                      for messages without usage information.
              
                      Returns:
                          Total token count across all messages
                      """
                      # Use cost_info if available
                      total = sum(m.cost_info.token_usage.total_tokens for m in self if m.cost_info)
              
                      # For messages without cost_info, estimate using tiktoken
                      if msgs := [msg for msg in self if not msg.cost_info]:
                          if fallback_model:
                              model_name = fallback_model
                          else:
                              model_name = next(
                                  (m.model_name for m in self if m.model_name), DEFAULT_TOKEN_MODEL
                              )
                          contents = [str(msg.content) for msg in msgs]
                          total += sum(batch_count_tokens(contents, model_name))
              
                      return total
              
                  def get_total_cost(self) -> float:
                      """Calculate total cost in USD across all messages.
              
                      Only includes messages with cost information.
              
                      Returns:
                          Total cost in USD
                      """
                      return sum(float(msg.cost_info.total_cost) for msg in self if msg.cost_info)
              
                  @property
                  def last_message(self) -> ChatMessage[Any] | None:
                      """Get most recent message or None if empty."""
                      return self[-1] if self else None
              
                  def format(self, *, style: FormatStyle = "simple", **kwargs: Any) -> str:
                      """Format conversation history with configurable style.
              
                      Args:
                          style: Formatting style to use
                          **kwargs: Additional formatting options passed to message.format()
              
                      Returns:
                          Formatted conversation history as string
                      """
                      return "\n".join(msg.format(style=style, **kwargs) for msg in self)
              
                  def filter_by_role(
                      self,
                      role: MessageRole,
                      *,
                      max_messages: int | None = None,
                  ) -> list[ChatMessage[Any]]:
                      """Get messages with specific role.
              
                      Args:
                          role: Role to filter by (user/assistant/system)
                          max_messages: Optional limit on number of messages to return
              
                      Returns:
                          List of messages with matching role
                      """
                      messages = [msg for msg in self if msg.role == role]
                      if max_messages:
                          messages = messages[-max_messages:]
                      return messages
              
                  def get_between(
                      self,
                      *,
                      start_time: datetime | None = None,
                      end_time: datetime | None = None,
                  ) -> list[ChatMessage[Any]]:
                      """Get messages within a time range.
              
                      Args:
                          start_time: Optional start of range
                          end_time: Optional end of range
              
                      Returns:
                          List of messages within the time range
                      """
                      messages = list(self)
                      if start_time:
                          messages = [msg for msg in messages if msg.timestamp >= start_time]
                      if end_time:
                          messages = [msg for msg in messages if msg.timestamp <= end_time]
                      return messages
              
                  def _build_flow_dag(self, message: ChatMessage[Any]) -> DAGNode | None:
                      """Build DAG from message flow.
              
                      Args:
                          message: Message to build flow DAG for
              
                      Returns:
                          Root DAGNode of the graph
                      """
                      from bigtree import DAGNode
              
                      # Get messages from this conversation
                      conv_messages = [m for m in self if m.conversation_id == message.conversation_id]
                      nodes: dict[str, DAGNode] = {}
                      for msg in conv_messages:  # First create all nodes
                          if msg.forwarded_from:
                              chain = [*msg.forwarded_from, msg.name or "unknown"]
                              for name in chain:
                                  if name not in nodes:
                                      nodes[name] = DAGNode(name)
              
                      # Then set up parent relationships
                      for msg in conv_messages:
                          if msg.forwarded_from:
                              chain = [*msg.forwarded_from, msg.name or "unknown"]
                              # Connect consecutive nodes
                              for parent_name, child_name in itertools.pairwise(chain):
                                  parent = nodes[parent_name]
                                  child = nodes[child_name]
                                  if parent not in child.parents:
                                      child.parents = [*child.parents, parent]
              
                      # Find root nodes (those without parents)
                      roots = [node for node in nodes.values() if not node.parents]
                      if not roots:
                          return None
                      return roots[0]  # Return first root for now
              
                  def to_mermaid_graph(
                      self,
                      message: ChatMessage[Any],
                      *,
                      title: str = "",
                      theme: str | None = None,
                      rankdir: Literal["TB", "BT", "LR", "RL"] = "LR",
                  ) -> str:
                      """Convert message flow to mermaid graph."""
                      from bigtree import dag_to_list
              
                      dag = self._build_flow_dag(message)
                      if not dag:
                          return ""
              
                      # Get list of connections
                      connections = dag_to_list(dag)
              
                      # Convert to mermaid
                      lines = ["```mermaid"]
                      if title:
                          lines.extend(["---", f"title: {title}", "---"])
                      if theme:
                          lines.append(f'%%{{ init: {{ "theme": "{theme}" }} }}%%')
                      lines.append(f"flowchart {rankdir}")
              
                      # Add connections
                      for parent, child in connections:
                          lines.append(f"    {parent}-->{child}")
              
                      lines.append("```")
                      return "\n".join(lines)
              

              last_message property

              last_message: ChatMessage[Any] | None
              

              Get most recent message or None if empty.

              filter_by_role

              filter_by_role(
                  role: MessageRole, *, max_messages: int | None = None
              ) -> list[ChatMessage[Any]]
              

              Get messages with specific role.

              Parameters:

              Name Type Description Default
              role MessageRole

              Role to filter by (user/assistant/system)

              required
              max_messages int | None

              Optional limit on number of messages to return

              None

              Returns:

              Type Description
              list[ChatMessage[Any]]

              List of messages with matching role

              Source code in src/llmling_agent/messaging/message_container.py
              105
              106
              107
              108
              109
              110
              111
              112
              113
              114
              115
              116
              117
              118
              119
              120
              121
              122
              123
              def filter_by_role(
                  self,
                  role: MessageRole,
                  *,
                  max_messages: int | None = None,
              ) -> list[ChatMessage[Any]]:
                  """Get messages with specific role.
              
                  Args:
                      role: Role to filter by (user/assistant/system)
                      max_messages: Optional limit on number of messages to return
              
                  Returns:
                      List of messages with matching role
                  """
                  messages = [msg for msg in self if msg.role == role]
                  if max_messages:
                      messages = messages[-max_messages:]
                  return messages
              

              format

              format(*, style: FormatStyle = 'simple', **kwargs: Any) -> str
              

              Format conversation history with configurable style.

              Parameters:

              Name Type Description Default
              style FormatStyle

              Formatting style to use

              'simple'
              **kwargs Any

              Additional formatting options passed to message.format()

              {}

              Returns:

              Type Description
              str

              Formatted conversation history as string

              Source code in src/llmling_agent/messaging/message_container.py
               93
               94
               95
               96
               97
               98
               99
              100
              101
              102
              103
              def format(self, *, style: FormatStyle = "simple", **kwargs: Any) -> str:
                  """Format conversation history with configurable style.
              
                  Args:
                      style: Formatting style to use
                      **kwargs: Additional formatting options passed to message.format()
              
                  Returns:
                      Formatted conversation history as string
                  """
                  return "\n".join(msg.format(style=style, **kwargs) for msg in self)
              

              get_between

              get_between(
                  *, start_time: datetime | None = None, end_time: datetime | None = None
              ) -> list[ChatMessage[Any]]
              

              Get messages within a time range.

              Parameters:

              Name Type Description Default
              start_time datetime | None

              Optional start of range

              None
              end_time datetime | None

              Optional end of range

              None

              Returns:

              Type Description
              list[ChatMessage[Any]]

              List of messages within the time range

              Source code in src/llmling_agent/messaging/message_container.py
              125
              126
              127
              128
              129
              130
              131
              132
              133
              134
              135
              136
              137
              138
              139
              140
              141
              142
              143
              144
              145
              def get_between(
                  self,
                  *,
                  start_time: datetime | None = None,
                  end_time: datetime | None = None,
              ) -> list[ChatMessage[Any]]:
                  """Get messages within a time range.
              
                  Args:
                      start_time: Optional start of range
                      end_time: Optional end of range
              
                  Returns:
                      List of messages within the time range
                  """
                  messages = list(self)
                  if start_time:
                      messages = [msg for msg in messages if msg.timestamp >= start_time]
                  if end_time:
                      messages = [msg for msg in messages if msg.timestamp <= end_time]
                  return messages
              

              get_history_tokens

              get_history_tokens(fallback_model: str | None = None) -> int
              

              Get total token count for all messages.

              Uses cost_info when available, falls back to tiktoken estimation for messages without usage information.

              Returns:

              Type Description
              int

              Total token count across all messages

              Source code in src/llmling_agent/messaging/message_container.py
              53
              54
              55
              56
              57
              58
              59
              60
              61
              62
              63
              64
              65
              66
              67
              68
              69
              70
              71
              72
              73
              74
              75
              76
              def get_history_tokens(self, fallback_model: str | None = None) -> int:
                  """Get total token count for all messages.
              
                  Uses cost_info when available, falls back to tiktoken estimation
                  for messages without usage information.
              
                  Returns:
                      Total token count across all messages
                  """
                  # Use cost_info if available
                  total = sum(m.cost_info.token_usage.total_tokens for m in self if m.cost_info)
              
                  # For messages without cost_info, estimate using tiktoken
                  if msgs := [msg for msg in self if not msg.cost_info]:
                      if fallback_model:
                          model_name = fallback_model
                      else:
                          model_name = next(
                              (m.model_name for m in self if m.model_name), DEFAULT_TOKEN_MODEL
                          )
                      contents = [str(msg.content) for msg in msgs]
                      total += sum(batch_count_tokens(contents, model_name))
              
                  return total
              

              get_message_tokens

              get_message_tokens(message: ChatMessage[Any]) -> int
              

              Get token count for a single message.

              Uses cost_info if available, falls back to tiktoken estimation.

              Parameters:

              Name Type Description Default
              message ChatMessage[Any]

              Message to count tokens for

              required

              Returns:

              Type Description
              int

              Token count for the message

              Source code in src/llmling_agent/messaging/message_container.py
              38
              39
              40
              41
              42
              43
              44
              45
              46
              47
              48
              49
              50
              51
              def get_message_tokens(self, message: ChatMessage[Any]) -> int:
                  """Get token count for a single message.
              
                  Uses cost_info if available, falls back to tiktoken estimation.
              
                  Args:
                      message: Message to count tokens for
              
                  Returns:
                      Token count for the message
                  """
                  if message.cost_info:
                      return message.cost_info.token_usage.total_tokens
                  return count_tokens(str(message.usage.total_tokens), message.model_name)
              

              get_total_cost

              get_total_cost() -> float
              

              Calculate total cost in USD across all messages.

              Only includes messages with cost information.

              Returns:

              Type Description
              float

              Total cost in USD

              Source code in src/llmling_agent/messaging/message_container.py
              78
              79
              80
              81
              82
              83
              84
              85
              86
              def get_total_cost(self) -> float:
                  """Calculate total cost in USD across all messages.
              
                  Only includes messages with cost information.
              
                  Returns:
                      Total cost in USD
                  """
                  return sum(float(msg.cost_info.total_cost) for msg in self if msg.cost_info)
              

              to_mermaid_graph

              to_mermaid_graph(
                  message: ChatMessage[Any],
                  *,
                  title: str = "",
                  theme: str | None = None,
                  rankdir: Literal["TB", "BT", "LR", "RL"] = "LR",
              ) -> str
              

              Convert message flow to mermaid graph.

              Source code in src/llmling_agent/messaging/message_container.py
              185
              186
              187
              188
              189
              190
              191
              192
              193
              194
              195
              196
              197
              198
              199
              200
              201
              202
              203
              204
              205
              206
              207
              208
              209
              210
              211
              212
              213
              214
              215
              216
              def to_mermaid_graph(
                  self,
                  message: ChatMessage[Any],
                  *,
                  title: str = "",
                  theme: str | None = None,
                  rankdir: Literal["TB", "BT", "LR", "RL"] = "LR",
              ) -> str:
                  """Convert message flow to mermaid graph."""
                  from bigtree import dag_to_list
              
                  dag = self._build_flow_dag(message)
                  if not dag:
                      return ""
              
                  # Get list of connections
                  connections = dag_to_list(dag)
              
                  # Convert to mermaid
                  lines = ["```mermaid"]
                  if title:
                      lines.extend(["---", f"title: {title}", "---"])
                  if theme:
                      lines.append(f'%%{{ init: {{ "theme": "{theme}" }} }}%%')
                  lines.append(f"flowchart {rankdir}")
              
                  # Add connections
                  for parent, child in connections:
                      lines.append(f"    {parent}-->{child}")
              
                  lines.append("```")
                  return "\n".join(lines)
              

              EventManager

              Manages multiple event sources and their lifecycles.

              Source code in src/llmling_agent/messaging/event_manager.py
               43
               44
               45
               46
               47
               48
               49
               50
               51
               52
               53
               54
               55
               56
               57
               58
               59
               60
               61
               62
               63
               64
               65
               66
               67
               68
               69
               70
               71
               72
               73
               74
               75
               76
               77
               78
               79
               80
               81
               82
               83
               84
               85
               86
               87
               88
               89
               90
               91
               92
               93
               94
               95
               96
               97
               98
               99
              100
              101
              102
              103
              104
              105
              106
              107
              108
              109
              110
              111
              112
              113
              114
              115
              116
              117
              118
              119
              120
              121
              122
              123
              124
              125
              126
              127
              128
              129
              130
              131
              132
              133
              134
              135
              136
              137
              138
              139
              140
              141
              142
              143
              144
              145
              146
              147
              148
              149
              150
              151
              152
              153
              154
              155
              156
              157
              158
              159
              160
              161
              162
              163
              164
              165
              166
              167
              168
              169
              170
              171
              172
              173
              174
              175
              176
              177
              178
              179
              180
              181
              182
              183
              184
              185
              186
              187
              188
              189
              190
              191
              192
              193
              194
              195
              196
              197
              198
              199
              200
              201
              202
              203
              204
              205
              206
              207
              208
              209
              210
              211
              212
              213
              214
              215
              216
              217
              218
              219
              220
              221
              222
              223
              224
              225
              226
              227
              228
              229
              230
              231
              232
              233
              234
              235
              236
              237
              238
              239
              240
              241
              242
              243
              244
              245
              246
              247
              248
              249
              250
              251
              252
              253
              254
              255
              256
              257
              258
              259
              260
              261
              262
              263
              264
              265
              266
              267
              268
              269
              270
              271
              272
              273
              274
              275
              276
              277
              278
              279
              280
              281
              282
              283
              284
              285
              286
              287
              288
              289
              290
              291
              292
              293
              294
              295
              296
              297
              298
              299
              300
              301
              302
              303
              304
              305
              306
              307
              308
              309
              310
              311
              312
              313
              314
              315
              316
              317
              318
              319
              320
              321
              322
              323
              324
              325
              326
              327
              328
              329
              330
              331
              332
              333
              334
              335
              336
              337
              338
              339
              340
              341
              342
              343
              344
              345
              346
              347
              348
              349
              350
              351
              352
              353
              354
              355
              356
              357
              358
              359
              360
              361
              362
              363
              364
              365
              366
              367
              368
              369
              370
              371
              372
              373
              374
              375
              376
              377
              378
              379
              380
              381
              382
              383
              384
              385
              386
              387
              388
              389
              390
              391
              392
              393
              394
              395
              396
              397
              398
              399
              400
              401
              402
              403
              404
              405
              406
              407
              408
              409
              410
              411
              412
              413
              414
              415
              416
              417
              418
              419
              420
              421
              422
              423
              424
              425
              426
              427
              428
              429
              430
              431
              432
              433
              434
              435
              436
              437
              438
              439
              440
              441
              442
              443
              444
              445
              446
              447
              448
              449
              450
              class EventManager:
                  """Manages multiple event sources and their lifecycles."""
              
                  event_processed = Signal(EventData)
              
                  def __init__(
                      self,
                      node: MessageEmitter[Any, Any],
                      enable_events: bool = True,
                      auto_run: bool = True,
                  ):
                      """Initialize event manager.
              
                      Args:
                          node: Agent to manage events for
                          enable_events: Whether to enable event processing
                          auto_run: Whether to automatically call run() for event callbacks
                      """
                      self.node = node
                      self.enabled = enable_events
                      self._sources: dict[str, EventSource] = {}
                      self._callbacks: list[EventCallback] = []
                      self.auto_run = auto_run
                      self._observers: dict[str, list[EventObserver]] = {}
              
                  async def _default_handler(self, event: EventData):
                      """Default event handler that converts events to node runs."""
                      if prompt := event.to_prompt():  # Only run if event provides a prompt
                          await self.node.run(prompt)
              
                  def add_callback(self, callback: EventCallback):
                      """Register an event callback."""
                      self._callbacks.append(callback)
              
                  def remove_callback(self, callback: EventCallback):
                      """Remove a previously registered callback."""
                      self._callbacks.remove(callback)
              
                  async def emit_event(self, event: EventData):
                      """Emit event to all callbacks and optionally handle via node."""
                      if not self.enabled:
                          return
              
                      # Run custom callbacks
                      for callback in self._callbacks:
                          try:
                              result = callback(event)
                              if isinstance(result, Awaitable):
                                  await result
                          except Exception:
                              logger.exception("Error in event callback", name=callback.__name__)
              
                      # Run default handler if enabled
                      if self.auto_run:
                          try:
                              prompt = event.to_prompt()
                              if prompt:
                                  await self.node.run(prompt)
                          except Exception:
                              logger.exception("Error in default event handler")
                      self.event_processed.emit(event)
              
                  async def add_file_watch(
                      self,
                      paths: str | Sequence[str],
                      *,
                      name: str | None = None,
                      extensions: list[str] | None = None,
                      ignore_paths: list[str] | None = None,
                      recursive: bool = True,
                      debounce: int = 1600,
                  ) -> EventSource:
                      """Add file system watch event source.
              
                      Args:
                          paths: Paths or patterns to watch
                          name: Optional source name (default: generated from paths)
                          extensions: File extensions to monitor
                          ignore_paths: Paths to ignore
                          recursive: Whether to watch subdirectories
                          debounce: Minimum time between events (ms)
                      """
                      path_list = [paths] if isinstance(paths, str) else list(paths)
                      config = FileWatchConfig(
                          name=name or f"file_watch_{len(self._sources)}",
                          paths=path_list,
                          extensions=extensions,
                          ignore_paths=ignore_paths,
                          recursive=recursive,
                          debounce=debounce,
                      )
                      return await self.add_source(config)
              
                  async def add_webhook(
                      self,
                      path: str,
                      *,
                      name: str | None = None,
                      port: int = 8000,
                      secret: str | None = None,
                  ) -> EventSource:
                      """Add webhook event source.
              
                      Args:
                          path: URL path to listen on
                          name: Optional source name
                          port: Port to listen on
                          secret: Optional secret for request validation
                      """
                      name = name or f"webhook_{len(self._sources)}"
                      config = WebhookConfig(name=name, path=path, port=port, secret=secret)
                      return await self.add_source(config)
              
                  async def add_timed_event(
                      self,
                      schedule: str,
                      prompt: str,
                      *,
                      name: str | None = None,
                      timezone: str | None = None,
                      skip_missed: bool = False,
                  ) -> TimeEventSource:
                      """Add time-based event source.
              
                      Args:
                          schedule: Cron expression (e.g. "0 9 * * 1-5" for weekdays at 9am)
                          prompt: Prompt to send when triggered
                          name: Optional source name
                          timezone: Optional timezone (system default if None)
                          skip_missed: Whether to skip missed executions
                      """
                      config = TimeEventConfig(
                          name=name or f"timed_{len(self._sources)}",
                          schedule=schedule,
                          prompt=prompt,
                          timezone=timezone,
                          skip_missed=skip_missed,
                      )
                      return await self.add_source(config)  # type: ignore
              
                  async def add_email_watch(
                      self,
                      host: str,
                      username: str,
                      password: str,
                      *,
                      name: str | None = None,
                      port: int = 993,
                      folder: str = "INBOX",
                      ssl: bool = True,
                      check_interval: int = 60,
                      mark_seen: bool = True,
                      filters: dict[str, str] | None = None,
                      max_size: int | None = None,
                  ) -> EventSource:
                      """Add email monitoring event source.
              
                      Args:
                          host: IMAP server hostname
                          username: Email account username
                          password: Account password or app password
                          name: Optional source name
                          port: Server port (default: 993 for IMAP SSL)
                          folder: Mailbox to monitor
                          ssl: Whether to use SSL/TLS
                          check_interval: Seconds between checks
                          mark_seen: Whether to mark processed emails as seen
                          filters: Optional email filtering criteria
                          max_size: Maximum email size in bytes
                      """
                      config = EmailConfig(
                          name=name or f"email_{len(self._sources)}",
                          host=host,
                          username=username,
                          password=SecretStr(password),
                          port=port,
                          folder=folder,
                          ssl=ssl,
                          check_interval=check_interval,
                          mark_seen=mark_seen,
                          filters=filters or {},
                          max_size=max_size,
                      )
                      return await self.add_source(config)
              
                  async def add_source(self, config: EventConfig) -> EventSource:
                      """Add and start a new event source.
              
                      Args:
                          config: Event source configuration
              
                      Raises:
                          ValueError: If source already exists or is invalid
                      """
                      logger.debug("Setting up event source", name=config.name, type=config.type)
                      from llmling_agent_events.base import EventSource
              
                      if config.name in self._sources:
                          msg = f"Event source already exists: {config.name}"
                          raise ValueError(msg)
              
                      try:
                          source = EventSource.from_config(config)
                          await source.connect()
                          self._sources[config.name] = source
                          # Start processing events
                          name = f"event_processor_{config.name}"
                          self.node.task_manager.create_task(self._process_events(source), name=name)
                          logger.debug("Added event source", name=config.name)
                      except Exception as e:
                          msg = "Failed to add event source"
                          logger.exception(msg, name=config.name)
                          raise RuntimeError(msg) from e
                      else:
                          return source
              
                  async def remove_source(self, name: str):
                      """Stop and remove an event source.
              
                      Args:
                          name: Name of source to remove
                      """
                      if source := self._sources.pop(name, None):
                          await source.disconnect()
                          logger.debug("Removed event source", name=name)
              
                  async def _process_events(self, source: EventSource):
                      """Process events from a source.
              
                      Args:
                          source: Event source to process
                      """
                      try:
                          # Get the async iterator from the coroutine
                          async for event in source.events():
                              if not self.enabled:
                                  logger.debug("Event processing disabled, skipping event")
                                  continue
                              await self.emit_event(event)
              
                      except asyncio.CancelledError:
                          logger.debug("Event processing cancelled")
                          raise
              
                      except Exception:
                          logger.exception("Error processing events")
              
                  async def cleanup(self):
                      """Clean up all event sources and tasks."""
                      self.enabled = False
              
                      for name in list(self._sources):
                          await self.remove_source(name)
              
                  async def __aenter__(self) -> Self:
                      """Allow using manager as async context manager."""
                      if not self.enabled:
                          return self
              
                      # Set up triggers from config
                      if (
                          self.node.context
                          and self.node.context.config
                          and self.node.context.config.triggers
                      ):
                          for trigger in self.node.context.config.triggers:
                              await self.add_source(trigger)
              
                      return self
              
                  async def __aexit__(self, *exc: object):
                      """Clean up when exiting context."""
                      await self.cleanup()
              
                  @overload
                  def track[T](
                      self,
                      event_name: str | None = None,
                      **event_metadata: Any,
                  ) -> Callable[[Callable[..., T]], Callable[..., T]]: ...
              
                  @overload
                  def track[T](
                      self,
                      event_name: str | None = None,
                      **event_metadata: Any,
                  ) -> Callable[
                      [Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]
                  ]: ...
              
                  def track(
                      self,
                      event_name: str | None = None,
                      **event_metadata: Any,
                  ) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
                      """Track function calls as events.
              
                      Args:
                          event_name: Optional name for the event (defaults to function name)
                          **event_metadata: Additional metadata to include with event
              
                      Example:
                          @event_manager.track("user_search")
                          async def search_docs(query: str) -> list[Doc]:
                              results = await search(query)
                              return results  # This result becomes event data
                      """
              
                      def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
                          name = event_name or func.__name__
              
                          @wraps(func)
                          async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                              start_time = get_now()
                              meta = {"args": args, "kwargs": kwargs, **event_metadata}
                              try:
                                  result = await func(*args, **kwargs)
                                  if self.enabled:
                                      meta |= {"status": "success", "duration": get_now() - start_time}
                                      event = EventData.create(name, content=result, metadata=meta)
                                      await self.emit_event(event)
                              except Exception as e:
                                  if self.enabled:
                                      dur = get_now() - start_time
                                      meta |= {"status": "error", "error": str(e), "duration": dur}
                                      event = EventData.create(name, content=str(e), metadata=meta)
                                      await self.emit_event(event)
                                  raise
                              else:
                                  return result
              
                          @wraps(func)
                          def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
                              start_time = get_now()
                              meta = {"args": args, "kwargs": kwargs, **event_metadata}
                              try:
                                  result = func(*args, **kwargs)
                                  if self.enabled:
                                      meta |= {"status": "success", "duration": get_now() - start_time}
                                      event = EventData.create(name, content=result, metadata=meta)
                                      self.node.task_manager.run_background(self.emit_event(event))
                              except Exception as e:
                                  if self.enabled:
                                      dur = get_now() - start_time
                                      meta |= {"status": "error", "error": str(e), "duration": dur}
                                      event = EventData.create(name, content=str(e), metadata=meta)
                                      self.node.task_manager.run_background(self.emit_event(event))
                                  raise
                              else:
                                  return result
              
                          return async_wrapper if inspect.iscoroutinefunction(func) else sync_wrapper
              
                      return decorator
              
                  @overload
                  def poll[T](
                      self,
                      event_type: str,
                      interval: timedelta | None = None,
                  ) -> Callable[[Callable[..., T]], Callable[..., T]]: ...
              
                  @overload
                  def poll[T](
                      self,
                      event_type: str,
                      interval: timedelta | None = None,
                  ) -> Callable[
                      [Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]
                  ]: ...
              
                  def poll(
                      self,
                      event_type: str,
                      interval: timedelta | None = None,
                  ) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
                      """Decorator to register an event observer.
              
                      Args:
                          event_type: Type of event to observe
                          interval: Optional polling interval for periodic checks
              
                      Example:
                          @event_manager.observe("file_changed")
                          async def handle_file_change(event: FileEventData):
                              await process_file(event.path)
                      """
              
                      def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
                          observer = EventObserver(func, interval=interval)
                          self._observers.setdefault(event_type, []).append(observer)
              
                          @wraps(func)
                          async def wrapper(*args: Any, **kwargs: Any) -> Any:
                              result = await execute(func, *args, **kwargs)
                              # Convert result to event and emit
                              if self.enabled:
                                  typ = type(result).__name__
                                  meta = {"type": "function_result", "output_type": typ}
                                  event = FunctionResultEventData(
                                      result=result, source=event_type, metadata=meta
                                  )
                                  await self.emit_event(event)
                              return result
              
                          return wrapper
              
                      return decorator
              

              __aenter__ async

              __aenter__() -> Self
              

              Allow using manager as async context manager.

              Source code in src/llmling_agent/messaging/event_manager.py
              297
              298
              299
              300
              301
              302
              303
              304
              305
              306
              307
              308
              309
              310
              311
              async def __aenter__(self) -> Self:
                  """Allow using manager as async context manager."""
                  if not self.enabled:
                      return self
              
                  # Set up triggers from config
                  if (
                      self.node.context
                      and self.node.context.config
                      and self.node.context.config.triggers
                  ):
                      for trigger in self.node.context.config.triggers:
                          await self.add_source(trigger)
              
                  return self
              

              __aexit__ async

              __aexit__(*exc: object)
              

              Clean up when exiting context.

              Source code in src/llmling_agent/messaging/event_manager.py
              313
              314
              315
              async def __aexit__(self, *exc: object):
                  """Clean up when exiting context."""
                  await self.cleanup()
              

              __init__

              __init__(
                  node: MessageEmitter[Any, Any], enable_events: bool = True, auto_run: bool = True
              )
              

              Initialize event manager.

              Parameters:

              Name Type Description Default
              node MessageEmitter[Any, Any]

              Agent to manage events for

              required
              enable_events bool

              Whether to enable event processing

              True
              auto_run bool

              Whether to automatically call run() for event callbacks

              True
              Source code in src/llmling_agent/messaging/event_manager.py
              48
              49
              50
              51
              52
              53
              54
              55
              56
              57
              58
              59
              60
              61
              62
              63
              64
              65
              66
              def __init__(
                  self,
                  node: MessageEmitter[Any, Any],
                  enable_events: bool = True,
                  auto_run: bool = True,
              ):
                  """Initialize event manager.
              
                  Args:
                      node: Agent to manage events for
                      enable_events: Whether to enable event processing
                      auto_run: Whether to automatically call run() for event callbacks
                  """
                  self.node = node
                  self.enabled = enable_events
                  self._sources: dict[str, EventSource] = {}
                  self._callbacks: list[EventCallback] = []
                  self.auto_run = auto_run
                  self._observers: dict[str, list[EventObserver]] = {}
              

              add_callback

              add_callback(callback: EventCallback)
              

              Register an event callback.

              Source code in src/llmling_agent/messaging/event_manager.py
              73
              74
              75
              def add_callback(self, callback: EventCallback):
                  """Register an event callback."""
                  self._callbacks.append(callback)
              

              add_email_watch async

              add_email_watch(
                  host: str,
                  username: str,
                  password: str,
                  *,
                  name: str | None = None,
                  port: int = 993,
                  folder: str = "INBOX",
                  ssl: bool = True,
                  check_interval: int = 60,
                  mark_seen: bool = True,
                  filters: dict[str, str] | None = None,
                  max_size: int | None = None,
              ) -> EventSource
              

              Add email monitoring event source.

              Parameters:

              Name Type Description Default
              host str

              IMAP server hostname

              required
              username str

              Email account username

              required
              password str

              Account password or app password

              required
              name str | None

              Optional source name

              None
              port int

              Server port (default: 993 for IMAP SSL)

              993
              folder str

              Mailbox to monitor

              'INBOX'
              ssl bool

              Whether to use SSL/TLS

              True
              check_interval int

              Seconds between checks

              60
              mark_seen bool

              Whether to mark processed emails as seen

              True
              filters dict[str, str] | None

              Optional email filtering criteria

              None
              max_size int | None

              Maximum email size in bytes

              None
              Source code in src/llmling_agent/messaging/event_manager.py
              183
              184
              185
              186
              187
              188
              189
              190
              191
              192
              193
              194
              195
              196
              197
              198
              199
              200
              201
              202
              203
              204
              205
              206
              207
              208
              209
              210
              211
              212
              213
              214
              215
              216
              217
              218
              219
              220
              221
              222
              223
              224
              225
              226
              async def add_email_watch(
                  self,
                  host: str,
                  username: str,
                  password: str,
                  *,
                  name: str | None = None,
                  port: int = 993,
                  folder: str = "INBOX",
                  ssl: bool = True,
                  check_interval: int = 60,
                  mark_seen: bool = True,
                  filters: dict[str, str] | None = None,
                  max_size: int | None = None,
              ) -> EventSource:
                  """Add email monitoring event source.
              
                  Args:
                      host: IMAP server hostname
                      username: Email account username
                      password: Account password or app password
                      name: Optional source name
                      port: Server port (default: 993 for IMAP SSL)
                      folder: Mailbox to monitor
                      ssl: Whether to use SSL/TLS
                      check_interval: Seconds between checks
                      mark_seen: Whether to mark processed emails as seen
                      filters: Optional email filtering criteria
                      max_size: Maximum email size in bytes
                  """
                  config = EmailConfig(
                      name=name or f"email_{len(self._sources)}",
                      host=host,
                      username=username,
                      password=SecretStr(password),
                      port=port,
                      folder=folder,
                      ssl=ssl,
                      check_interval=check_interval,
                      mark_seen=mark_seen,
                      filters=filters or {},
                      max_size=max_size,
                  )
                  return await self.add_source(config)
              

              add_file_watch async

              add_file_watch(
                  paths: str | Sequence[str],
                  *,
                  name: str | None = None,
                  extensions: list[str] | None = None,
                  ignore_paths: list[str] | None = None,
                  recursive: bool = True,
                  debounce: int = 1600,
              ) -> EventSource
              

              Add file system watch event source.

              Parameters:

              Name Type Description Default
              paths str | Sequence[str]

              Paths or patterns to watch

              required
              name str | None

              Optional source name (default: generated from paths)

              None
              extensions list[str] | None

              File extensions to monitor

              None
              ignore_paths list[str] | None

              Paths to ignore

              None
              recursive bool

              Whether to watch subdirectories

              True
              debounce int

              Minimum time between events (ms)

              1600
              Source code in src/llmling_agent/messaging/event_manager.py
              105
              106
              107
              108
              109
              110
              111
              112
              113
              114
              115
              116
              117
              118
              119
              120
              121
              122
              123
              124
              125
              126
              127
              128
              129
              130
              131
              132
              133
              134
              async def add_file_watch(
                  self,
                  paths: str | Sequence[str],
                  *,
                  name: str | None = None,
                  extensions: list[str] | None = None,
                  ignore_paths: list[str] | None = None,
                  recursive: bool = True,
                  debounce: int = 1600,
              ) -> EventSource:
                  """Add file system watch event source.
              
                  Args:
                      paths: Paths or patterns to watch
                      name: Optional source name (default: generated from paths)
                      extensions: File extensions to monitor
                      ignore_paths: Paths to ignore
                      recursive: Whether to watch subdirectories
                      debounce: Minimum time between events (ms)
                  """
                  path_list = [paths] if isinstance(paths, str) else list(paths)
                  config = FileWatchConfig(
                      name=name or f"file_watch_{len(self._sources)}",
                      paths=path_list,
                      extensions=extensions,
                      ignore_paths=ignore_paths,
                      recursive=recursive,
                      debounce=debounce,
                  )
                  return await self.add_source(config)
              

              add_source async

              add_source(config: EventConfig) -> EventSource
              

              Add and start a new event source.

              Parameters:

              Name Type Description Default
              config EventConfig

              Event source configuration

              required

              Raises:

              Type Description
              ValueError

              If source already exists or is invalid

              Source code in src/llmling_agent/messaging/event_manager.py
              228
              229
              230
              231
              232
              233
              234
              235
              236
              237
              238
              239
              240
              241
              242
              243
              244
              245
              246
              247
              248
              249
              250
              251
              252
              253
              254
              255
              256
              257
              async def add_source(self, config: EventConfig) -> EventSource:
                  """Add and start a new event source.
              
                  Args:
                      config: Event source configuration
              
                  Raises:
                      ValueError: If source already exists or is invalid
                  """
                  logger.debug("Setting up event source", name=config.name, type=config.type)
                  from llmling_agent_events.base import EventSource
              
                  if config.name in self._sources:
                      msg = f"Event source already exists: {config.name}"
                      raise ValueError(msg)
              
                  try:
                      source = EventSource.from_config(config)
                      await source.connect()
                      self._sources[config.name] = source
                      # Start processing events
                      name = f"event_processor_{config.name}"
                      self.node.task_manager.create_task(self._process_events(source), name=name)
                      logger.debug("Added event source", name=config.name)
                  except Exception as e:
                      msg = "Failed to add event source"
                      logger.exception(msg, name=config.name)
                      raise RuntimeError(msg) from e
                  else:
                      return source
              

              add_timed_event async

              add_timed_event(
                  schedule: str,
                  prompt: str,
                  *,
                  name: str | None = None,
                  timezone: str | None = None,
                  skip_missed: bool = False,
              ) -> TimeEventSource
              

              Add time-based event source.

              Parameters:

              Name Type Description Default
              schedule str

              Cron expression (e.g. "0 9 * * 1-5" for weekdays at 9am)

              required
              prompt str

              Prompt to send when triggered

              required
              name str | None

              Optional source name

              None
              timezone str | None

              Optional timezone (system default if None)

              None
              skip_missed bool

              Whether to skip missed executions

              False
              Source code in src/llmling_agent/messaging/event_manager.py
              156
              157
              158
              159
              160
              161
              162
              163
              164
              165
              166
              167
              168
              169
              170
              171
              172
              173
              174
              175
              176
              177
              178
              179
              180
              181
              async def add_timed_event(
                  self,
                  schedule: str,
                  prompt: str,
                  *,
                  name: str | None = None,
                  timezone: str | None = None,
                  skip_missed: bool = False,
              ) -> TimeEventSource:
                  """Add time-based event source.
              
                  Args:
                      schedule: Cron expression (e.g. "0 9 * * 1-5" for weekdays at 9am)
                      prompt: Prompt to send when triggered
                      name: Optional source name
                      timezone: Optional timezone (system default if None)
                      skip_missed: Whether to skip missed executions
                  """
                  config = TimeEventConfig(
                      name=name or f"timed_{len(self._sources)}",
                      schedule=schedule,
                      prompt=prompt,
                      timezone=timezone,
                      skip_missed=skip_missed,
                  )
                  return await self.add_source(config)  # type: ignore
              

              add_webhook async

              add_webhook(
                  path: str, *, name: str | None = None, port: int = 8000, secret: str | None = None
              ) -> EventSource
              

              Add webhook event source.

              Parameters:

              Name Type Description Default
              path str

              URL path to listen on

              required
              name str | None

              Optional source name

              None
              port int

              Port to listen on

              8000
              secret str | None

              Optional secret for request validation

              None
              Source code in src/llmling_agent/messaging/event_manager.py
              136
              137
              138
              139
              140
              141
              142
              143
              144
              145
              146
              147
              148
              149
              150
              151
              152
              153
              154
              async def add_webhook(
                  self,
                  path: str,
                  *,
                  name: str | None = None,
                  port: int = 8000,
                  secret: str | None = None,
              ) -> EventSource:
                  """Add webhook event source.
              
                  Args:
                      path: URL path to listen on
                      name: Optional source name
                      port: Port to listen on
                      secret: Optional secret for request validation
                  """
                  name = name or f"webhook_{len(self._sources)}"
                  config = WebhookConfig(name=name, path=path, port=port, secret=secret)
                  return await self.add_source(config)
              

              cleanup async

              cleanup()
              

              Clean up all event sources and tasks.

              Source code in src/llmling_agent/messaging/event_manager.py
              290
              291
              292
              293
              294
              295
              async def cleanup(self):
                  """Clean up all event sources and tasks."""
                  self.enabled = False
              
                  for name in list(self._sources):
                      await self.remove_source(name)
              

              emit_event async

              emit_event(event: EventData)
              

              Emit event to all callbacks and optionally handle via node.

              Source code in src/llmling_agent/messaging/event_manager.py
               81
               82
               83
               84
               85
               86
               87
               88
               89
               90
               91
               92
               93
               94
               95
               96
               97
               98
               99
              100
              101
              102
              103
              async def emit_event(self, event: EventData):
                  """Emit event to all callbacks and optionally handle via node."""
                  if not self.enabled:
                      return
              
                  # Run custom callbacks
                  for callback in self._callbacks:
                      try:
                          result = callback(event)
                          if isinstance(result, Awaitable):
                              await result
                      except Exception:
                          logger.exception("Error in event callback", name=callback.__name__)
              
                  # Run default handler if enabled
                  if self.auto_run:
                      try:
                          prompt = event.to_prompt()
                          if prompt:
                              await self.node.run(prompt)
                      except Exception:
                          logger.exception("Error in default event handler")
                  self.event_processed.emit(event)
              

              poll

              poll(
                  event_type: str, interval: timedelta | None = None
              ) -> Callable[[Callable[..., T]], Callable[..., T]]
              
              poll(
                  event_type: str, interval: timedelta | None = None
              ) -> Callable[
                  [Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]
              ]
              
              poll(
                  event_type: str, interval: timedelta | None = None
              ) -> Callable[[Callable[..., Any]], Callable[..., Any]]
              

              Decorator to register an event observer.

              Parameters:

              Name Type Description Default
              event_type str

              Type of event to observe

              required
              interval timedelta | None

              Optional polling interval for periodic checks

              None
              Example

              @event_manager.observe("file_changed") async def handle_file_change(event: FileEventData): await process_file(event.path)

              Source code in src/llmling_agent/messaging/event_manager.py
              414
              415
              416
              417
              418
              419
              420
              421
              422
              423
              424
              425
              426
              427
              428
              429
              430
              431
              432
              433
              434
              435
              436
              437
              438
              439
              440
              441
              442
              443
              444
              445
              446
              447
              448
              449
              450
              def poll(
                  self,
                  event_type: str,
                  interval: timedelta | None = None,
              ) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
                  """Decorator to register an event observer.
              
                  Args:
                      event_type: Type of event to observe
                      interval: Optional polling interval for periodic checks
              
                  Example:
                      @event_manager.observe("file_changed")
                      async def handle_file_change(event: FileEventData):
                          await process_file(event.path)
                  """
              
                  def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
                      observer = EventObserver(func, interval=interval)
                      self._observers.setdefault(event_type, []).append(observer)
              
                      @wraps(func)
                      async def wrapper(*args: Any, **kwargs: Any) -> Any:
                          result = await execute(func, *args, **kwargs)
                          # Convert result to event and emit
                          if self.enabled:
                              typ = type(result).__name__
                              meta = {"type": "function_result", "output_type": typ}
                              event = FunctionResultEventData(
                                  result=result, source=event_type, metadata=meta
                              )
                              await self.emit_event(event)
                          return result
              
                      return wrapper
              
                  return decorator
              

              remove_callback

              remove_callback(callback: EventCallback)
              

              Remove a previously registered callback.

              Source code in src/llmling_agent/messaging/event_manager.py
              77
              78
              79
              def remove_callback(self, callback: EventCallback):
                  """Remove a previously registered callback."""
                  self._callbacks.remove(callback)
              

              remove_source async

              remove_source(name: str)
              

              Stop and remove an event source.

              Parameters:

              Name Type Description Default
              name str

              Name of source to remove

              required
              Source code in src/llmling_agent/messaging/event_manager.py
              259
              260
              261
              262
              263
              264
              265
              266
              267
              async def remove_source(self, name: str):
                  """Stop and remove an event source.
              
                  Args:
                      name: Name of source to remove
                  """
                  if source := self._sources.pop(name, None):
                      await source.disconnect()
                      logger.debug("Removed event source", name=name)
              

              track

              track(
                  event_name: str | None = None, **event_metadata: Any
              ) -> Callable[[Callable[..., T]], Callable[..., T]]
              
              track(
                  event_name: str | None = None, **event_metadata: Any
              ) -> Callable[
                  [Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]
              ]
              
              track(
                  event_name: str | None = None, **event_metadata: Any
              ) -> Callable[[Callable[..., Any]], Callable[..., Any]]
              

              Track function calls as events.

              Parameters:

              Name Type Description Default
              event_name str | None

              Optional name for the event (defaults to function name)

              None
              **event_metadata Any

              Additional metadata to include with event

              {}
              Example

              @event_manager.track("user_search") async def search_docs(query: str) -> list[Doc]: results = await search(query) return results # This result becomes event data

              Source code in src/llmling_agent/messaging/event_manager.py
              333
              334
              335
              336
              337
              338
              339
              340
              341
              342
              343
              344
              345
              346
              347
              348
              349
              350
              351
              352
              353
              354
              355
              356
              357
              358
              359
              360
              361
              362
              363
              364
              365
              366
              367
              368
              369
              370
              371
              372
              373
              374
              375
              376
              377
              378
              379
              380
              381
              382
              383
              384
              385
              386
              387
              388
              389
              390
              391
              392
              393
              394
              395
              396
              def track(
                  self,
                  event_name: str | None = None,
                  **event_metadata: Any,
              ) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
                  """Track function calls as events.
              
                  Args:
                      event_name: Optional name for the event (defaults to function name)
                      **event_metadata: Additional metadata to include with event
              
                  Example:
                      @event_manager.track("user_search")
                      async def search_docs(query: str) -> list[Doc]:
                          results = await search(query)
                          return results  # This result becomes event data
                  """
              
                  def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
                      name = event_name or func.__name__
              
                      @wraps(func)
                      async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                          start_time = get_now()
                          meta = {"args": args, "kwargs": kwargs, **event_metadata}
                          try:
                              result = await func(*args, **kwargs)
                              if self.enabled:
                                  meta |= {"status": "success", "duration": get_now() - start_time}
                                  event = EventData.create(name, content=result, metadata=meta)
                                  await self.emit_event(event)
                          except Exception as e:
                              if self.enabled:
                                  dur = get_now() - start_time
                                  meta |= {"status": "error", "error": str(e), "duration": dur}
                                  event = EventData.create(name, content=str(e), metadata=meta)
                                  await self.emit_event(event)
                              raise
                          else:
                              return result
              
                      @wraps(func)
                      def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
                          start_time = get_now()
                          meta = {"args": args, "kwargs": kwargs, **event_metadata}
                          try:
                              result = func(*args, **kwargs)
                              if self.enabled:
                                  meta |= {"status": "success", "duration": get_now() - start_time}
                                  event = EventData.create(name, content=result, metadata=meta)
                                  self.node.task_manager.run_background(self.emit_event(event))
                          except Exception as e:
                              if self.enabled:
                                  dur = get_now() - start_time
                                  meta |= {"status": "error", "error": str(e), "duration": dur}
                                  event = EventData.create(name, content=str(e), metadata=meta)
                                  self.node.task_manager.run_background(self.emit_event(event))
                              raise
                          else:
                              return result
              
                      return async_wrapper if inspect.iscoroutinefunction(func) else sync_wrapper
              
                  return decorator
              

              MessageNode

              Bases: MessageEmitter[TDeps, TResult]

              Base class for all message processing nodes.

              Source code in src/llmling_agent/messaging/messagenode.py
               31
               32
               33
               34
               35
               36
               37
               38
               39
               40
               41
               42
               43
               44
               45
               46
               47
               48
               49
               50
               51
               52
               53
               54
               55
               56
               57
               58
               59
               60
               61
               62
               63
               64
               65
               66
               67
               68
               69
               70
               71
               72
               73
               74
               75
               76
               77
               78
               79
               80
               81
               82
               83
               84
               85
               86
               87
               88
               89
               90
               91
               92
               93
               94
               95
               96
               97
               98
               99
              100
              101
              102
              103
              104
              105
              106
              107
              108
              109
              110
              111
              112
              113
              114
              115
              116
              117
              118
              119
              120
              121
              122
              123
              124
              125
              126
              127
              128
              129
              130
              131
              132
              133
              134
              135
              136
              137
              138
              139
              140
              141
              142
              143
              144
              145
              146
              147
              148
              149
              150
              151
              152
              153
              154
              155
              156
              157
              158
              159
              160
              161
              162
              163
              class MessageNode[TDeps, TResult](MessageEmitter[TDeps, TResult]):
                  """Base class for all message processing nodes."""
              
                  tool_used = Signal(ToolCallInfo)
                  """Signal emitted when node uses a tool."""
              
                  async def pre_run(
                      self,
                      *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
                  ) -> tuple[ChatMessage[Any], list[Content | str]]:
                      """Hook to prepare a MessgeNode run call.
              
                      Args:
                          *prompt: The prompt(s) to prepare.
              
                      Returns:
                          A tuple of:
                              - Either incoming message, or a constructed incoming message based
                                on the prompt(s).
                              - A list of prompts to be sent to the model.
                      """
                      if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                          user_msg = prompt[0]
                          prompts = await convert_prompts([user_msg.content])
                          # Update received message's chain to show it came through its source
                          user_msg = user_msg.forwarded(prompt[0]).to_request()
                          # clear cost info to avoid double-counting
                          final_prompt = "\n\n".join(str(p) for p in prompts)
                      else:
                          prompts = await convert_prompts(prompt)
                          final_prompt = "\n\n".join(str(p) for p in prompts)
                          # use format_prompts?
                          user_msg = ChatMessage[str](
                              content=final_prompt,
                              role="user",
                              conversation_id=str(uuid4()),
                              parts=[
                                  UserPromptPart(content=[content_to_pydantic_ai(i) for i in prompts])
                              ],
                          )
                      self.message_received.emit(user_msg)
                      self.context.current_prompt = final_prompt
                      return user_msg, prompts
              
                  # async def post_run(
                  #     self,
                  #     message: ChatMessage[TResult],
                  #     previous_message: ChatMessage[Any] | None,
                  #     wait_for_connections: bool | None = None,
                  # ) -> ChatMessage[Any]:
                  #     # For chain processing, update the response's chain
                  #     if previous_message:
                  #         message = message.forwarded(previous_message)
                  #         conversation_id = previous_message.conversation_id
                  #     else:
                  #         conversation_id = str(uuid4())
                  #     # Set conversation_id on response message
                  #     message = replace(message, conversation_id=conversation_id)
                  #     self.message_sent.emit(message)
                  #     await self.log_message(response_msg)
                  #     await self.connections.route_message(message, wait=wait_for_connections)
                  #     return message
              
                  # @overload
                  # async def run(
                  #     self,
                  #     *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
                  #     wait_for_connections: bool | None = None,
                  #     store_history: bool = True,
                  #     output_type: None,
                  #     **kwargs: Any,
                  # ) -> ChatMessage[TResult]: ...
              
                  # @overload
                  # async def run[OutputTypeT](
                  #     self,
                  #     *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
                  #     wait_for_connections: bool | None = None,
                  #     store_history: bool = True,
                  #     output_type: type[OutputTypeT],
                  #     **kwargs: Any,
                  # ) -> ChatMessage[OutputTypeT]: ...
              
                  @method_spawner
                  async def run[OutputTypeT](
                      self,
                      *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
                      wait_for_connections: bool | None = None,
                      store_history: bool = True,
                      output_type: type[OutputTypeT] | None = None,
                      **kwargs: Any,
                  ) -> ChatMessage[Any]:
                      """Execute node with prompts and handle message routing.
              
                      Args:
                          prompt: Input prompts
                          wait_for_connections: Whether to wait for forwarded messages
                          store_history: Whether to store in conversation history
                          output_type: Type of output to expect
                          **kwargs: Additional arguments for _run
                      """
                      from llmling_agent import Agent
              
                      user_msg, prompts = await self.pre_run(*prompt)
                      message = await self._run(
                          *prompts,
                          store_history=store_history,
                          conversation_id=user_msg.conversation_id,
                          output_type=output_type,
                          **kwargs,
                      )
              
                      # For chain processing, update the response's chain
                      if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                          message = message.forwarded(prompt[0])
              
                      if store_history and isinstance(self, Agent):
                          self.conversation.add_chat_messages([user_msg, message])
                      self.message_sent.emit(message)
                      await self.connections.route_message(message, wait=wait_for_connections)
                      return message
              
                  @abstractmethod
                  async def get_stats(self) -> MessageStats | AggregatedMessageStats:
                      """Get message statistics for this node."""
              
                  @abstractmethod
                  def run_iter(
                      self,
                      *prompts: Any,
                      **kwargs: Any,
                  ) -> AsyncIterator[ChatMessage[Any]]:
                      """Yield messages during execution."""
              

              tool_used class-attribute instance-attribute

              tool_used = Signal(ToolCallInfo)
              

              Signal emitted when node uses a tool.

              get_stats abstractmethod async

              Get message statistics for this node.

              Source code in src/llmling_agent/messaging/messagenode.py
              153
              154
              155
              @abstractmethod
              async def get_stats(self) -> MessageStats | AggregatedMessageStats:
                  """Get message statistics for this node."""
              

              pre_run async

              pre_run(
                  *prompt: AnyPromptType | Image | PathLike[str] | ChatMessage,
              ) -> tuple[ChatMessage[Any], list[Content | str]]
              

              Hook to prepare a MessgeNode run call.

              Parameters:

              Name Type Description Default
              *prompt AnyPromptType | Image | PathLike[str] | ChatMessage

              The prompt(s) to prepare.

              ()

              Returns:

              Type Description
              tuple[ChatMessage[Any], list[Content | str]]

              A tuple of: - Either incoming message, or a constructed incoming message based on the prompt(s). - A list of prompts to be sent to the model.

              Source code in src/llmling_agent/messaging/messagenode.py
              37
              38
              39
              40
              41
              42
              43
              44
              45
              46
              47
              48
              49
              50
              51
              52
              53
              54
              55
              56
              57
              58
              59
              60
              61
              62
              63
              64
              65
              66
              67
              68
              69
              70
              71
              72
              73
              async def pre_run(
                  self,
                  *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
              ) -> tuple[ChatMessage[Any], list[Content | str]]:
                  """Hook to prepare a MessgeNode run call.
              
                  Args:
                      *prompt: The prompt(s) to prepare.
              
                  Returns:
                      A tuple of:
                          - Either incoming message, or a constructed incoming message based
                            on the prompt(s).
                          - A list of prompts to be sent to the model.
                  """
                  if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                      user_msg = prompt[0]
                      prompts = await convert_prompts([user_msg.content])
                      # Update received message's chain to show it came through its source
                      user_msg = user_msg.forwarded(prompt[0]).to_request()
                      # clear cost info to avoid double-counting
                      final_prompt = "\n\n".join(str(p) for p in prompts)
                  else:
                      prompts = await convert_prompts(prompt)
                      final_prompt = "\n\n".join(str(p) for p in prompts)
                      # use format_prompts?
                      user_msg = ChatMessage[str](
                          content=final_prompt,
                          role="user",
                          conversation_id=str(uuid4()),
                          parts=[
                              UserPromptPart(content=[content_to_pydantic_ai(i) for i in prompts])
                          ],
                      )
                  self.message_received.emit(user_msg)
                  self.context.current_prompt = final_prompt
                  return user_msg, prompts
              

              run async

              run(
                  *prompt: AnyPromptType | Image | PathLike[str] | ChatMessage,
                  wait_for_connections: bool | None = None,
                  store_history: bool = True,
                  output_type: type[OutputTypeT] | None = None,
                  **kwargs: Any,
              ) -> ChatMessage[Any]
              

              Execute node with prompts and handle message routing.

              Parameters:

              Name Type Description Default
              prompt AnyPromptType | Image | PathLike[str] | ChatMessage

              Input prompts

              ()
              wait_for_connections bool | None

              Whether to wait for forwarded messages

              None
              store_history bool

              Whether to store in conversation history

              True
              output_type type[OutputTypeT] | None

              Type of output to expect

              None
              **kwargs Any

              Additional arguments for _run

              {}
              Source code in src/llmling_agent/messaging/messagenode.py
              114
              115
              116
              117
              118
              119
              120
              121
              122
              123
              124
              125
              126
              127
              128
              129
              130
              131
              132
              133
              134
              135
              136
              137
              138
              139
              140
              141
              142
              143
              144
              145
              146
              147
              148
              149
              150
              151
              @method_spawner
              async def run[OutputTypeT](
                  self,
                  *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str] | ChatMessage,
                  wait_for_connections: bool | None = None,
                  store_history: bool = True,
                  output_type: type[OutputTypeT] | None = None,
                  **kwargs: Any,
              ) -> ChatMessage[Any]:
                  """Execute node with prompts and handle message routing.
              
                  Args:
                      prompt: Input prompts
                      wait_for_connections: Whether to wait for forwarded messages
                      store_history: Whether to store in conversation history
                      output_type: Type of output to expect
                      **kwargs: Additional arguments for _run
                  """
                  from llmling_agent import Agent
              
                  user_msg, prompts = await self.pre_run(*prompt)
                  message = await self._run(
                      *prompts,
                      store_history=store_history,
                      conversation_id=user_msg.conversation_id,
                      output_type=output_type,
                      **kwargs,
                  )
              
                  # For chain processing, update the response's chain
                  if len(prompt) == 1 and isinstance(prompt[0], ChatMessage):
                      message = message.forwarded(prompt[0])
              
                  if store_history and isinstance(self, Agent):
                      self.conversation.add_chat_messages([user_msg, message])
                  self.message_sent.emit(message)
                  await self.connections.route_message(message, wait=wait_for_connections)
                  return message
              

              run_iter abstractmethod

              run_iter(*prompts: Any, **kwargs: Any) -> AsyncIterator[ChatMessage[Any]]
              

              Yield messages during execution.

              Source code in src/llmling_agent/messaging/messagenode.py
              157
              158
              159
              160
              161
              162
              163
              @abstractmethod
              def run_iter(
                  self,
                  *prompts: Any,
                  **kwargs: Any,
              ) -> AsyncIterator[ChatMessage[Any]]:
                  """Yield messages during execution."""
              

              TeamResponse

              Bases: list[AgentResponse[Any]]

              Results from a team execution.

              Source code in src/llmling_agent/messaging/messages.py
              446
              447
              448
              449
              450
              451
              452
              453
              454
              455
              456
              457
              458
              459
              460
              461
              462
              463
              464
              465
              466
              467
              468
              469
              470
              471
              472
              473
              474
              475
              476
              477
              478
              479
              480
              481
              482
              483
              484
              485
              486
              487
              488
              489
              490
              491
              492
              493
              494
              495
              496
              497
              498
              499
              class TeamResponse[TMessageContent](list[AgentResponse[Any]]):
                  """Results from a team execution."""
              
                  def __init__(
                      self,
                      responses: list[AgentResponse[TMessageContent]],
                      start_time: datetime | None = None,
                      errors: dict[str, Exception] | None = None,
                  ):
                      super().__init__(responses)
                      self.start_time = start_time or get_now()
                      self.end_time = get_now()
                      self.errors = errors or {}
              
                  @property
                  def duration(self) -> float:
                      """Get execution duration in seconds."""
                      return (self.end_time - self.start_time).total_seconds()
              
                  @property
                  def success(self) -> bool:
                      """Whether all agents completed successfully."""
                      return not bool(self.errors)
              
                  @property
                  def failed_agents(self) -> list[str]:
                      """Names of agents that failed."""
                      return list(self.errors.keys())
              
                  def by_agent(self, name: str) -> AgentResponse[TMessageContent] | None:
                      """Get response from specific agent."""
                      return next((r for r in self if r.agent_name == name), None)
              
                  def format_durations(self) -> str:
                      """Format execution times."""
                      parts = [f"{r.agent_name}: {r.timing:.2f}s" for r in self if r.timing is not None]
                      return f"Individual times: {', '.join(parts)}\nTotal time: {self.duration:.2f}s"
              
                  # TODO: could keep TResultContent for len(messages) == 1
                  def to_chat_message(self) -> ChatMessage[str]:
                      """Convert team response to a single chat message."""
                      # Combine all responses into one structured message
                      content = "\n\n".join(
                          f"[{response.agent_name}]: {response.message.content}"
                          for response in self
                          if response.message
                      )
                      meta = {
                          "type": "team_response",
                          "agents": [r.agent_name for r in self],
                          "duration": self.duration,
                          "success_count": len(self),
                      }
                      return ChatMessage(content=content, role="assistant", metadata=meta)  # type: ignore
              

              duration property

              duration: float
              

              Get execution duration in seconds.

              failed_agents property

              failed_agents: list[str]
              

              Names of agents that failed.

              success property

              success: bool
              

              Whether all agents completed successfully.

              by_agent

              by_agent(name: str) -> AgentResponse[TMessageContent] | None
              

              Get response from specific agent.

              Source code in src/llmling_agent/messaging/messages.py
              475
              476
              477
              def by_agent(self, name: str) -> AgentResponse[TMessageContent] | None:
                  """Get response from specific agent."""
                  return next((r for r in self if r.agent_name == name), None)
              

              format_durations

              format_durations() -> str
              

              Format execution times.

              Source code in src/llmling_agent/messaging/messages.py
              479
              480
              481
              482
              def format_durations(self) -> str:
                  """Format execution times."""
                  parts = [f"{r.agent_name}: {r.timing:.2f}s" for r in self if r.timing is not None]
                  return f"Individual times: {', '.join(parts)}\nTotal time: {self.duration:.2f}s"
              

              to_chat_message

              to_chat_message() -> ChatMessage[str]
              

              Convert team response to a single chat message.

              Source code in src/llmling_agent/messaging/messages.py
              485
              486
              487
              488
              489
              490
              491
              492
              493
              494
              495
              496
              497
              498
              499
              def to_chat_message(self) -> ChatMessage[str]:
                  """Convert team response to a single chat message."""
                  # Combine all responses into one structured message
                  content = "\n\n".join(
                      f"[{response.agent_name}]: {response.message.content}"
                      for response in self
                      if response.message
                  )
                  meta = {
                      "type": "team_response",
                      "agents": [r.agent_name for r in self],
                      "duration": self.duration,
                      "success_count": len(self),
                  }
                  return ChatMessage(content=content, role="assistant", metadata=meta)  # type: ignore
              

              TokenCost dataclass

              Combined token and cost tracking.

              Source code in src/llmling_agent/messaging/messages.py
              100
              101
              102
              103
              104
              105
              106
              107
              108
              109
              110
              111
              112
              113
              114
              115
              116
              117
              118
              119
              120
              121
              122
              123
              124
              125
              126
              127
              128
              129
              130
              131
              132
              133
              134
              135
              136
              137
              138
              139
              140
              141
              142
              143
              144
              145
              146
              147
              148
              @dataclass(frozen=True)
              class TokenCost:
                  """Combined token and cost tracking."""
              
                  token_usage: RunUsage
                  """Token counts for prompt and completion"""
                  total_cost: Decimal
                  """Total cost in USD"""
              
                  @classmethod
                  async def from_usage(cls, usage: RunUsage | None, model: str) -> TokenCost | None:
                      """Create result from usage data.
              
                      Args:
                          usage: Token counts from model response
                          model: Name of the model used
              
              
                      Returns:
                          TokenCost if usage data available, None otherwise
                      """
                      if not (
                          usage and usage.input_tokens is not None and usage.output_tokens is not None
                      ):
                          logger.debug("Missing token counts in Usage object")
                          return None
                      logger.debug("Token usage", usage=usage)
              
                      # return cls(token_usage=token_usage, total_cost=Decimal(total_cost))
                      if model in {"None", "test"}:
                          price = Decimal(0)
                      else:
                          parts = model.split(":", 1)
                          try:
                              price_data = calc_price(
                                  usage,
                                  model_ref=parts[1] if len(parts) > 1 else parts[0],
                                  provider_id=parts[0] if len(parts) > 1 else "openai",
                              )
                              price = price_data.total_price
                          except Exception:  # noqa: BLE001
                              cost = await tokonomics.calculate_token_cost(
                                  model,
                                  usage.input_tokens,
                                  usage.output_tokens,
                              )
                              price = Decimal(cost.total_cost if cost else 0)
              
                      return cls(token_usage=usage, total_cost=price)
              

              token_usage instance-attribute

              token_usage: RunUsage
              

              Token counts for prompt and completion

              total_cost instance-attribute

              total_cost: Decimal
              

              Total cost in USD

              from_usage async classmethod

              from_usage(usage: RunUsage | None, model: str) -> TokenCost | None
              

              Create result from usage data.

              Parameters:

              Name Type Description Default
              usage RunUsage | None

              Token counts from model response

              required
              model str

              Name of the model used

              required

              Returns:

              Type Description
              TokenCost | None

              TokenCost if usage data available, None otherwise

              Source code in src/llmling_agent/messaging/messages.py
              109
              110
              111
              112
              113
              114
              115
              116
              117
              118
              119
              120
              121
              122
              123
              124
              125
              126
              127
              128
              129
              130
              131
              132
              133
              134
              135
              136
              137
              138
              139
              140
              141
              142
              143
              144
              145
              146
              147
              148
              @classmethod
              async def from_usage(cls, usage: RunUsage | None, model: str) -> TokenCost | None:
                  """Create result from usage data.
              
                  Args:
                      usage: Token counts from model response
                      model: Name of the model used
              
              
                  Returns:
                      TokenCost if usage data available, None otherwise
                  """
                  if not (
                      usage and usage.input_tokens is not None and usage.output_tokens is not None
                  ):
                      logger.debug("Missing token counts in Usage object")
                      return None
                  logger.debug("Token usage", usage=usage)
              
                  # return cls(token_usage=token_usage, total_cost=Decimal(total_cost))
                  if model in {"None", "test"}:
                      price = Decimal(0)
                  else:
                      parts = model.split(":", 1)
                      try:
                          price_data = calc_price(
                              usage,
                              model_ref=parts[1] if len(parts) > 1 else parts[0],
                              provider_id=parts[0] if len(parts) > 1 else "openai",
                          )
                          price = price_data.total_price
                      except Exception:  # noqa: BLE001
                          cost = await tokonomics.calculate_token_cost(
                              model,
                              usage.input_tokens,
                              usage.output_tokens,
                          )
                          price = Decimal(cost.total_cost if cost else 0)
              
                  return cls(token_usage=usage, total_cost=price)