Skip to content

event_manager

Class info

Classes

Name Children Inherits
EventManager
llmling_agent.messaging.event_manager
Manages multiple event sources and their lifecycles.
    EventObserver
    llmling_agent.messaging.event_manager
    Registered event observer.

      🛈 DocStrings

      Event manager for handling multiple event sources.

      EventManager

      Manages multiple event sources and their lifecycles.

      Source code in src/llmling_agent/messaging/event_manager.py
       41
       42
       43
       44
       45
       46
       47
       48
       49
       50
       51
       52
       53
       54
       55
       56
       57
       58
       59
       60
       61
       62
       63
       64
       65
       66
       67
       68
       69
       70
       71
       72
       73
       74
       75
       76
       77
       78
       79
       80
       81
       82
       83
       84
       85
       86
       87
       88
       89
       90
       91
       92
       93
       94
       95
       96
       97
       98
       99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      154
      155
      156
      157
      158
      159
      160
      161
      162
      163
      164
      165
      166
      167
      168
      169
      170
      171
      172
      173
      174
      175
      176
      177
      178
      179
      180
      181
      182
      183
      184
      185
      186
      187
      188
      189
      190
      191
      192
      193
      194
      195
      196
      197
      198
      199
      200
      201
      202
      203
      204
      205
      206
      207
      208
      209
      210
      211
      212
      213
      214
      215
      216
      217
      218
      219
      220
      221
      222
      223
      224
      225
      226
      227
      228
      229
      230
      231
      232
      233
      234
      235
      236
      237
      238
      239
      240
      241
      242
      243
      244
      245
      246
      247
      248
      249
      250
      251
      252
      253
      254
      255
      256
      257
      258
      259
      260
      261
      262
      263
      264
      265
      266
      267
      268
      269
      270
      271
      272
      273
      274
      275
      276
      277
      278
      279
      280
      281
      282
      283
      284
      285
      286
      287
      288
      289
      290
      291
      292
      293
      294
      295
      296
      297
      298
      299
      300
      301
      302
      303
      304
      305
      306
      307
      308
      309
      310
      311
      312
      313
      314
      315
      316
      317
      318
      319
      320
      321
      322
      323
      324
      325
      326
      327
      328
      329
      330
      331
      332
      333
      334
      335
      336
      337
      338
      339
      340
      341
      342
      343
      344
      345
      346
      347
      348
      349
      350
      351
      352
      353
      354
      355
      356
      357
      358
      359
      360
      361
      362
      363
      364
      365
      366
      367
      368
      369
      370
      371
      372
      373
      374
      375
      376
      377
      378
      379
      380
      381
      382
      383
      384
      385
      386
      387
      388
      389
      390
      391
      392
      393
      394
      395
      396
      397
      398
      399
      400
      401
      402
      403
      404
      405
      406
      407
      408
      409
      410
      411
      412
      413
      414
      415
      416
      417
      418
      419
      420
      421
      422
      class EventManager:
          """Manages multiple event sources and their lifecycles."""
      
          event_processed = Signal(EventData)
      
          def __init__(
              self,
              node: MessageNode[Any, Any],
              enable_events: bool = True,
              auto_run: bool = True,
          ) -> None:
              """Initialize event manager.
      
              Args:
                  node: Agent to manage events for
                  enable_events: Whether to enable event processing
                  auto_run: Whether to automatically call run() for event callbacks
              """
              self.node = node
              self.enabled = enable_events
              self._sources: dict[str, EventSource] = {}
              self._callbacks: list[EventCallback] = []
              self.auto_run = auto_run
              self._observers = defaultdict[str, list[EventObserver]](list)
      
          async def _default_handler(self, event: EventData) -> None:
              """Default event handler that converts events to node runs."""
              if prompt := event.to_prompt():  # Only run if event provides a prompt
                  await self.node.run(prompt)
      
          def add_callback(self, callback: EventCallback) -> None:
              """Register an event callback."""
              self._callbacks.append(callback)
      
          def remove_callback(self, callback: EventCallback) -> None:
              """Remove a previously registered callback."""
              self._callbacks.remove(callback)
      
          async def emit_event(self, event: EventData) -> None:
              """Emit event to all callbacks and optionally handle via node."""
              if not self.enabled:
                  return
      
              # Run custom callbacks
              for callback in self._callbacks:
                  try:
                      result = callback(event)
                      if isinstance(result, Awaitable):
                          await result
                  except Exception:
                      logger.exception("Error in event callback", name=callback.__name__)
      
              # Run default handler if enabled
              if self.auto_run:
                  try:
                      prompt = event.to_prompt()
                      if prompt:
                          await self.node.run(prompt)
                  except Exception:
                      logger.exception("Error in default event handler")
              self.event_processed.emit(event)
      
          async def add_file_watch(
              self,
              paths: str | Sequence[str],
              *,
              name: str | None = None,
              extensions: list[str] | None = None,
              ignore_paths: list[str] | None = None,
              recursive: bool = True,
              debounce: int = 1600,
          ) -> EventSource:
              """Add file system watch event source.
      
              Args:
                  paths: Paths or patterns to watch
                  name: Optional source name (default: generated from paths)
                  extensions: File extensions to monitor
                  ignore_paths: Paths to ignore
                  recursive: Whether to watch subdirectories
                  debounce: Minimum time between events (ms)
              """
              path_list = [paths] if isinstance(paths, str) else list(paths)
              config = FileWatchConfig(
                  name=name or f"file_watch_{len(self._sources)}",
                  paths=path_list,
                  extensions=extensions,
                  ignore_paths=ignore_paths,
                  recursive=recursive,
                  debounce=debounce,
              )
              return await self.add_source(config)
      
          async def add_webhook(
              self,
              path: str,
              *,
              name: str | None = None,
              port: int = 8000,
              secret: str | None = None,
          ) -> EventSource:
              """Add webhook event source.
      
              Args:
                  path: URL path to listen on
                  name: Optional source name
                  port: Port to listen on
                  secret: Optional secret for request validation
              """
              name = name or f"webhook_{len(self._sources)}"
              sec = SecretStr(secret) if secret else None
              config = WebhookConfig(name=name, path=path, port=port, secret=sec)
              return await self.add_source(config)
      
          async def add_timed_event(
              self,
              schedule: str,
              prompt: str,
              *,
              name: str | None = None,
              timezone: str | None = None,
              skip_missed: bool = False,
          ) -> TimeEventSource:
              """Add time-based event source.
      
              Args:
                  schedule: Cron expression (e.g. "0 9 * * 1-5" for weekdays at 9am)
                  prompt: Prompt to send when triggered
                  name: Optional source name
                  timezone: Optional timezone (system default if None)
                  skip_missed: Whether to skip missed executions
              """
              config = TimeEventConfig(
                  name=name or f"timed_{len(self._sources)}",
                  schedule=schedule,
                  prompt=prompt,
                  timezone=timezone,
                  skip_missed=skip_missed,
              )
              return await self.add_source(config)  # type: ignore
      
          async def add_email_watch(
              self,
              host: str,
              username: str,
              password: str,
              *,
              name: str | None = None,
              port: int = 993,
              folder: str = "INBOX",
              ssl: bool = True,
              check_interval: int = 60,
              mark_seen: bool = True,
              filters: dict[str, str] | None = None,
              max_size: int | None = None,
          ) -> EventSource:
              """Add email monitoring event source.
      
              Args:
                  host: IMAP server hostname
                  username: Email account username
                  password: Account password or app password
                  name: Optional source name
                  port: Server port (default: 993 for IMAP SSL)
                  folder: Mailbox to monitor
                  ssl: Whether to use SSL/TLS
                  check_interval: Seconds between checks
                  mark_seen: Whether to mark processed emails as seen
                  filters: Optional email filtering criteria
                  max_size: Maximum email size in bytes
              """
              config = EmailConfig(
                  name=name or f"email_{len(self._sources)}",
                  host=host,
                  username=username,
                  password=SecretStr(password),
                  port=port,
                  folder=folder,
                  ssl=ssl,
                  check_interval=check_interval,
                  mark_seen=mark_seen,
                  filters=filters or {},
                  max_size=max_size,
              )
              return await self.add_source(config)
      
          async def add_source(self, config: EventConfig) -> EventSource:
              """Add and start a new event source.
      
              Args:
                  config: Event source configuration
      
              Raises:
                  ValueError: If source already exists or is invalid
              """
              logger.debug("Setting up event source", name=config.name, type=config.type)
              from evented.base import EventSource
      
              if config.name in self._sources:
                  msg = f"Event source already exists: {config.name}"
                  raise ValueError(msg)
      
              try:
                  source = EventSource.from_config(config)
                  await source.__aenter__()
                  self._sources[config.name] = source
                  # Start processing events
                  name = f"event_processor_{config.name}"
                  self.node.task_manager.create_task(self._process_events(source), name=name)
                  logger.debug("Added event source", name=config.name)
              except Exception as e:
                  msg = "Failed to add event source"
                  logger.exception(msg, name=config.name)
                  raise RuntimeError(msg) from e
              else:
                  return source
      
          async def remove_source(self, name: str) -> None:
              """Stop and remove an event source.
      
              Args:
                  name: Name of source to remove
              """
              if source := self._sources.pop(name, None):
                  await source.__aexit__(None, None, None)
                  logger.debug("Removed event source", name=name)
      
          async def _process_events(self, source: EventSource) -> None:
              """Process events from a source.
      
              Args:
                  source: Event source to process
              """
              try:
                  # Get the async iterator from the coroutine
                  async for event in source.events():
                      if not self.enabled:
                          logger.debug("Event processing disabled, skipping event")
                          continue
                      await self.emit_event(event)
      
              except asyncio.CancelledError:
                  logger.debug("Event processing cancelled")
                  raise
      
              except Exception:
                  logger.exception("Error processing events")
      
          async def cleanup(self) -> None:
              """Clean up all event sources and tasks."""
              self.enabled = False
      
              for name in list(self._sources):
                  await self.remove_source(name)
      
          async def __aenter__(self) -> Self:
              """Allow using manager as async context manager."""
              if not self.enabled:
                  return self
      
              # Set up triggers from config
              if self.node.context and self.node.context.config and self.node.context.config.triggers:
                  for trigger in self.node.context.config.triggers:
                      await self.add_source(trigger)
      
              return self
      
          async def __aexit__(
              self,
              exc_type: type[BaseException] | None,
              exc_val: BaseException | None,
              exc_tb: TracebackType | None,
          ) -> None:
              """Clean up when exiting context."""
              await self.cleanup()
      
          def track[T](
              self,
              event_name: str | None = None,
              **event_metadata: Any,
          ) -> (
              Callable[[Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]]
              | Callable[[Callable[..., T]], Callable[..., T]]
          ):
              """Track function calls as events.
      
              Args:
                  event_name: Optional name for the event (defaults to function name)
                  **event_metadata: Additional metadata to include with event
      
              Example:
                  @event_manager.track("user_search")
                  async def search_docs(query: str) -> list[Doc]:
                      results = await search(query)
                      return results  # This result becomes event data
              """
      
              def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
                  name = event_name or func.__name__
      
                  @wraps(func)
                  async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                      start_time = get_now()
                      meta = {"args": args, "kwargs": kwargs, **event_metadata}
                      try:
                          result = await func(*args, **kwargs)
                          if self.enabled:
                              meta |= {"status": "success", "duration": get_now() - start_time}
                              event = EventData.create(name, content=result, metadata=meta)
                              await self.emit_event(event)
                      except Exception as e:
                          if self.enabled:
                              dur = get_now() - start_time
                              meta |= {"status": "error", "error": str(e), "duration": dur}
                              event = EventData.create(name, content=str(e), metadata=meta)
                              await self.emit_event(event)
                          raise
                      else:
                          return result
      
                  @wraps(func)
                  def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
                      start_time = get_now()
                      meta = {"args": args, "kwargs": kwargs, **event_metadata}
                      try:
                          result = func(*args, **kwargs)
                          if self.enabled:
                              meta |= {"status": "success", "duration": get_now() - start_time}
                              event = EventData.create(name, content=result, metadata=meta)
                              self.node.task_manager.run_background(self.emit_event(event))
                      except Exception as e:
                          if self.enabled:
                              dur = get_now() - start_time
                              meta |= {"status": "error", "error": str(e), "duration": dur}
                              event = EventData.create(name, content=str(e), metadata=meta)
                              self.node.task_manager.run_background(self.emit_event(event))
                          raise
                      else:
                          return result
      
                  return async_wrapper if inspect.iscoroutinefunction(func) else sync_wrapper
      
              return decorator
      
          def poll[T](
              self,
              event_type: str,
              interval: timedelta | None = None,
          ) -> (
              Callable[[Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]]
              | Callable[[Callable[..., T]], Callable[..., T]]
          ):
              """Decorator to register an event observer.
      
              Args:
                  event_type: Type of event to observe
                  interval: Optional polling interval for periodic checks
      
              Example:
                  @event_manager.observe("file_changed")
                  async def handle_file_change(event: FileEventData):
                      await process_file(event.path)
              """
      
              def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
                  observer = EventObserver(func, interval=interval)
                  self._observers[event_type].append(observer)
      
                  @wraps(func)
                  async def wrapper(*args: Any, **kwargs: Any) -> Any:
                      result = await execute(func, *args, **kwargs)
                      # Convert result to event and emit
                      if self.enabled:
                          typ = type(result).__name__
                          meta = {"type": "function_result", "output_type": typ}
                          event = FunctionResultEventData(result=result, source=event_type, metadata=meta)
                          await self.emit_event(event)
                      return result
      
                  return wrapper
      
              return decorator
      

      __aenter__ async

      __aenter__() -> Self
      

      Allow using manager as async context manager.

      Source code in src/llmling_agent/messaging/event_manager.py
      296
      297
      298
      299
      300
      301
      302
      303
      304
      305
      306
      async def __aenter__(self) -> Self:
          """Allow using manager as async context manager."""
          if not self.enabled:
              return self
      
          # Set up triggers from config
          if self.node.context and self.node.context.config and self.node.context.config.triggers:
              for trigger in self.node.context.config.triggers:
                  await self.add_source(trigger)
      
          return self
      

      __aexit__ async

      __aexit__(
          exc_type: type[BaseException] | None,
          exc_val: BaseException | None,
          exc_tb: TracebackType | None,
      ) -> None
      

      Clean up when exiting context.

      Source code in src/llmling_agent/messaging/event_manager.py
      308
      309
      310
      311
      312
      313
      314
      315
      async def __aexit__(
          self,
          exc_type: type[BaseException] | None,
          exc_val: BaseException | None,
          exc_tb: TracebackType | None,
      ) -> None:
          """Clean up when exiting context."""
          await self.cleanup()
      

      __init__

      __init__(
          node: MessageNode[Any, Any], enable_events: bool = True, auto_run: bool = True
      ) -> None
      

      Initialize event manager.

      Parameters:

      Name Type Description Default
      node MessageNode[Any, Any]

      Agent to manage events for

      required
      enable_events bool

      Whether to enable event processing

      True
      auto_run bool

      Whether to automatically call run() for event callbacks

      True
      Source code in src/llmling_agent/messaging/event_manager.py
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      def __init__(
          self,
          node: MessageNode[Any, Any],
          enable_events: bool = True,
          auto_run: bool = True,
      ) -> None:
          """Initialize event manager.
      
          Args:
              node: Agent to manage events for
              enable_events: Whether to enable event processing
              auto_run: Whether to automatically call run() for event callbacks
          """
          self.node = node
          self.enabled = enable_events
          self._sources: dict[str, EventSource] = {}
          self._callbacks: list[EventCallback] = []
          self.auto_run = auto_run
          self._observers = defaultdict[str, list[EventObserver]](list)
      

      add_callback

      add_callback(callback: EventCallback) -> None
      

      Register an event callback.

      Source code in src/llmling_agent/messaging/event_manager.py
      71
      72
      73
      def add_callback(self, callback: EventCallback) -> None:
          """Register an event callback."""
          self._callbacks.append(callback)
      

      add_email_watch async

      add_email_watch(
          host: str,
          username: str,
          password: str,
          *,
          name: str | None = None,
          port: int = 993,
          folder: str = "INBOX",
          ssl: bool = True,
          check_interval: int = 60,
          mark_seen: bool = True,
          filters: dict[str, str] | None = None,
          max_size: int | None = None
      ) -> EventSource
      

      Add email monitoring event source.

      Parameters:

      Name Type Description Default
      host str

      IMAP server hostname

      required
      username str

      Email account username

      required
      password str

      Account password or app password

      required
      name str | None

      Optional source name

      None
      port int

      Server port (default: 993 for IMAP SSL)

      993
      folder str

      Mailbox to monitor

      'INBOX'
      ssl bool

      Whether to use SSL/TLS

      True
      check_interval int

      Seconds between checks

      60
      mark_seen bool

      Whether to mark processed emails as seen

      True
      filters dict[str, str] | None

      Optional email filtering criteria

      None
      max_size int | None

      Maximum email size in bytes

      None
      Source code in src/llmling_agent/messaging/event_manager.py
      182
      183
      184
      185
      186
      187
      188
      189
      190
      191
      192
      193
      194
      195
      196
      197
      198
      199
      200
      201
      202
      203
      204
      205
      206
      207
      208
      209
      210
      211
      212
      213
      214
      215
      216
      217
      218
      219
      220
      221
      222
      223
      224
      225
      async def add_email_watch(
          self,
          host: str,
          username: str,
          password: str,
          *,
          name: str | None = None,
          port: int = 993,
          folder: str = "INBOX",
          ssl: bool = True,
          check_interval: int = 60,
          mark_seen: bool = True,
          filters: dict[str, str] | None = None,
          max_size: int | None = None,
      ) -> EventSource:
          """Add email monitoring event source.
      
          Args:
              host: IMAP server hostname
              username: Email account username
              password: Account password or app password
              name: Optional source name
              port: Server port (default: 993 for IMAP SSL)
              folder: Mailbox to monitor
              ssl: Whether to use SSL/TLS
              check_interval: Seconds between checks
              mark_seen: Whether to mark processed emails as seen
              filters: Optional email filtering criteria
              max_size: Maximum email size in bytes
          """
          config = EmailConfig(
              name=name or f"email_{len(self._sources)}",
              host=host,
              username=username,
              password=SecretStr(password),
              port=port,
              folder=folder,
              ssl=ssl,
              check_interval=check_interval,
              mark_seen=mark_seen,
              filters=filters or {},
              max_size=max_size,
          )
          return await self.add_source(config)
      

      add_file_watch async

      add_file_watch(
          paths: str | Sequence[str],
          *,
          name: str | None = None,
          extensions: list[str] | None = None,
          ignore_paths: list[str] | None = None,
          recursive: bool = True,
          debounce: int = 1600
      ) -> EventSource
      

      Add file system watch event source.

      Parameters:

      Name Type Description Default
      paths str | Sequence[str]

      Paths or patterns to watch

      required
      name str | None

      Optional source name (default: generated from paths)

      None
      extensions list[str] | None

      File extensions to monitor

      None
      ignore_paths list[str] | None

      Paths to ignore

      None
      recursive bool

      Whether to watch subdirectories

      True
      debounce int

      Minimum time between events (ms)

      1600
      Source code in src/llmling_agent/messaging/event_manager.py
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      async def add_file_watch(
          self,
          paths: str | Sequence[str],
          *,
          name: str | None = None,
          extensions: list[str] | None = None,
          ignore_paths: list[str] | None = None,
          recursive: bool = True,
          debounce: int = 1600,
      ) -> EventSource:
          """Add file system watch event source.
      
          Args:
              paths: Paths or patterns to watch
              name: Optional source name (default: generated from paths)
              extensions: File extensions to monitor
              ignore_paths: Paths to ignore
              recursive: Whether to watch subdirectories
              debounce: Minimum time between events (ms)
          """
          path_list = [paths] if isinstance(paths, str) else list(paths)
          config = FileWatchConfig(
              name=name or f"file_watch_{len(self._sources)}",
              paths=path_list,
              extensions=extensions,
              ignore_paths=ignore_paths,
              recursive=recursive,
              debounce=debounce,
          )
          return await self.add_source(config)
      

      add_source async

      add_source(config: EventConfig) -> EventSource
      

      Add and start a new event source.

      Parameters:

      Name Type Description Default
      config EventConfig

      Event source configuration

      required

      Raises:

      Type Description
      ValueError

      If source already exists or is invalid

      Source code in src/llmling_agent/messaging/event_manager.py
      227
      228
      229
      230
      231
      232
      233
      234
      235
      236
      237
      238
      239
      240
      241
      242
      243
      244
      245
      246
      247
      248
      249
      250
      251
      252
      253
      254
      255
      256
      async def add_source(self, config: EventConfig) -> EventSource:
          """Add and start a new event source.
      
          Args:
              config: Event source configuration
      
          Raises:
              ValueError: If source already exists or is invalid
          """
          logger.debug("Setting up event source", name=config.name, type=config.type)
          from evented.base import EventSource
      
          if config.name in self._sources:
              msg = f"Event source already exists: {config.name}"
              raise ValueError(msg)
      
          try:
              source = EventSource.from_config(config)
              await source.__aenter__()
              self._sources[config.name] = source
              # Start processing events
              name = f"event_processor_{config.name}"
              self.node.task_manager.create_task(self._process_events(source), name=name)
              logger.debug("Added event source", name=config.name)
          except Exception as e:
              msg = "Failed to add event source"
              logger.exception(msg, name=config.name)
              raise RuntimeError(msg) from e
          else:
              return source
      

      add_timed_event async

      add_timed_event(
          schedule: str,
          prompt: str,
          *,
          name: str | None = None,
          timezone: str | None = None,
          skip_missed: bool = False
      ) -> TimeEventSource
      

      Add time-based event source.

      Parameters:

      Name Type Description Default
      schedule str

      Cron expression (e.g. "0 9 * * 1-5" for weekdays at 9am)

      required
      prompt str

      Prompt to send when triggered

      required
      name str | None

      Optional source name

      None
      timezone str | None

      Optional timezone (system default if None)

      None
      skip_missed bool

      Whether to skip missed executions

      False
      Source code in src/llmling_agent/messaging/event_manager.py
      155
      156
      157
      158
      159
      160
      161
      162
      163
      164
      165
      166
      167
      168
      169
      170
      171
      172
      173
      174
      175
      176
      177
      178
      179
      180
      async def add_timed_event(
          self,
          schedule: str,
          prompt: str,
          *,
          name: str | None = None,
          timezone: str | None = None,
          skip_missed: bool = False,
      ) -> TimeEventSource:
          """Add time-based event source.
      
          Args:
              schedule: Cron expression (e.g. "0 9 * * 1-5" for weekdays at 9am)
              prompt: Prompt to send when triggered
              name: Optional source name
              timezone: Optional timezone (system default if None)
              skip_missed: Whether to skip missed executions
          """
          config = TimeEventConfig(
              name=name or f"timed_{len(self._sources)}",
              schedule=schedule,
              prompt=prompt,
              timezone=timezone,
              skip_missed=skip_missed,
          )
          return await self.add_source(config)  # type: ignore
      

      add_webhook async

      add_webhook(
          path: str, *, name: str | None = None, port: int = 8000, secret: str | None = None
      ) -> EventSource
      

      Add webhook event source.

      Parameters:

      Name Type Description Default
      path str

      URL path to listen on

      required
      name str | None

      Optional source name

      None
      port int

      Port to listen on

      8000
      secret str | None

      Optional secret for request validation

      None
      Source code in src/llmling_agent/messaging/event_manager.py
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      async def add_webhook(
          self,
          path: str,
          *,
          name: str | None = None,
          port: int = 8000,
          secret: str | None = None,
      ) -> EventSource:
          """Add webhook event source.
      
          Args:
              path: URL path to listen on
              name: Optional source name
              port: Port to listen on
              secret: Optional secret for request validation
          """
          name = name or f"webhook_{len(self._sources)}"
          sec = SecretStr(secret) if secret else None
          config = WebhookConfig(name=name, path=path, port=port, secret=sec)
          return await self.add_source(config)
      

      cleanup async

      cleanup() -> None
      

      Clean up all event sources and tasks.

      Source code in src/llmling_agent/messaging/event_manager.py
      289
      290
      291
      292
      293
      294
      async def cleanup(self) -> None:
          """Clean up all event sources and tasks."""
          self.enabled = False
      
          for name in list(self._sources):
              await self.remove_source(name)
      

      emit_event async

      emit_event(event: EventData) -> None
      

      Emit event to all callbacks and optionally handle via node.

      Source code in src/llmling_agent/messaging/event_manager.py
       79
       80
       81
       82
       83
       84
       85
       86
       87
       88
       89
       90
       91
       92
       93
       94
       95
       96
       97
       98
       99
      100
      101
      async def emit_event(self, event: EventData) -> None:
          """Emit event to all callbacks and optionally handle via node."""
          if not self.enabled:
              return
      
          # Run custom callbacks
          for callback in self._callbacks:
              try:
                  result = callback(event)
                  if isinstance(result, Awaitable):
                      await result
              except Exception:
                  logger.exception("Error in event callback", name=callback.__name__)
      
          # Run default handler if enabled
          if self.auto_run:
              try:
                  prompt = event.to_prompt()
                  if prompt:
                      await self.node.run(prompt)
              except Exception:
                  logger.exception("Error in default event handler")
          self.event_processed.emit(event)
      

      poll

      poll(
          event_type: str, interval: timedelta | None = None
      ) -> (
          Callable[
              [Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]
          ]
          | Callable[[Callable[..., T]], Callable[..., T]]
      )
      

      Decorator to register an event observer.

      Parameters:

      Name Type Description Default
      event_type str

      Type of event to observe

      required
      interval timedelta | None

      Optional polling interval for periodic checks

      None
      Example

      @event_manager.observe("file_changed") async def handle_file_change(event: FileEventData): await process_file(event.path)

      Source code in src/llmling_agent/messaging/event_manager.py
      385
      386
      387
      388
      389
      390
      391
      392
      393
      394
      395
      396
      397
      398
      399
      400
      401
      402
      403
      404
      405
      406
      407
      408
      409
      410
      411
      412
      413
      414
      415
      416
      417
      418
      419
      420
      421
      422
      def poll[T](
          self,
          event_type: str,
          interval: timedelta | None = None,
      ) -> (
          Callable[[Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]]
          | Callable[[Callable[..., T]], Callable[..., T]]
      ):
          """Decorator to register an event observer.
      
          Args:
              event_type: Type of event to observe
              interval: Optional polling interval for periodic checks
      
          Example:
              @event_manager.observe("file_changed")
              async def handle_file_change(event: FileEventData):
                  await process_file(event.path)
          """
      
          def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
              observer = EventObserver(func, interval=interval)
              self._observers[event_type].append(observer)
      
              @wraps(func)
              async def wrapper(*args: Any, **kwargs: Any) -> Any:
                  result = await execute(func, *args, **kwargs)
                  # Convert result to event and emit
                  if self.enabled:
                      typ = type(result).__name__
                      meta = {"type": "function_result", "output_type": typ}
                      event = FunctionResultEventData(result=result, source=event_type, metadata=meta)
                      await self.emit_event(event)
                  return result
      
              return wrapper
      
          return decorator
      

      remove_callback

      remove_callback(callback: EventCallback) -> None
      

      Remove a previously registered callback.

      Source code in src/llmling_agent/messaging/event_manager.py
      75
      76
      77
      def remove_callback(self, callback: EventCallback) -> None:
          """Remove a previously registered callback."""
          self._callbacks.remove(callback)
      

      remove_source async

      remove_source(name: str) -> None
      

      Stop and remove an event source.

      Parameters:

      Name Type Description Default
      name str

      Name of source to remove

      required
      Source code in src/llmling_agent/messaging/event_manager.py
      258
      259
      260
      261
      262
      263
      264
      265
      266
      async def remove_source(self, name: str) -> None:
          """Stop and remove an event source.
      
          Args:
              name: Name of source to remove
          """
          if source := self._sources.pop(name, None):
              await source.__aexit__(None, None, None)
              logger.debug("Removed event source", name=name)
      

      track

      track(
          event_name: str | None = None, **event_metadata: Any
      ) -> (
          Callable[
              [Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]
          ]
          | Callable[[Callable[..., T]], Callable[..., T]]
      )
      

      Track function calls as events.

      Parameters:

      Name Type Description Default
      event_name str | None

      Optional name for the event (defaults to function name)

      None
      **event_metadata Any

      Additional metadata to include with event

      {}
      Example

      @event_manager.track("user_search") async def search_docs(query: str) -> list[Doc]: results = await search(query) return results # This result becomes event data

      Source code in src/llmling_agent/messaging/event_manager.py
      317
      318
      319
      320
      321
      322
      323
      324
      325
      326
      327
      328
      329
      330
      331
      332
      333
      334
      335
      336
      337
      338
      339
      340
      341
      342
      343
      344
      345
      346
      347
      348
      349
      350
      351
      352
      353
      354
      355
      356
      357
      358
      359
      360
      361
      362
      363
      364
      365
      366
      367
      368
      369
      370
      371
      372
      373
      374
      375
      376
      377
      378
      379
      380
      381
      382
      383
      def track[T](
          self,
          event_name: str | None = None,
          **event_metadata: Any,
      ) -> (
          Callable[[Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]]
          | Callable[[Callable[..., T]], Callable[..., T]]
      ):
          """Track function calls as events.
      
          Args:
              event_name: Optional name for the event (defaults to function name)
              **event_metadata: Additional metadata to include with event
      
          Example:
              @event_manager.track("user_search")
              async def search_docs(query: str) -> list[Doc]:
                  results = await search(query)
                  return results  # This result becomes event data
          """
      
          def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
              name = event_name or func.__name__
      
              @wraps(func)
              async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                  start_time = get_now()
                  meta = {"args": args, "kwargs": kwargs, **event_metadata}
                  try:
                      result = await func(*args, **kwargs)
                      if self.enabled:
                          meta |= {"status": "success", "duration": get_now() - start_time}
                          event = EventData.create(name, content=result, metadata=meta)
                          await self.emit_event(event)
                  except Exception as e:
                      if self.enabled:
                          dur = get_now() - start_time
                          meta |= {"status": "error", "error": str(e), "duration": dur}
                          event = EventData.create(name, content=str(e), metadata=meta)
                          await self.emit_event(event)
                      raise
                  else:
                      return result
      
              @wraps(func)
              def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
                  start_time = get_now()
                  meta = {"args": args, "kwargs": kwargs, **event_metadata}
                  try:
                      result = func(*args, **kwargs)
                      if self.enabled:
                          meta |= {"status": "success", "duration": get_now() - start_time}
                          event = EventData.create(name, content=result, metadata=meta)
                          self.node.task_manager.run_background(self.emit_event(event))
                  except Exception as e:
                      if self.enabled:
                          dur = get_now() - start_time
                          meta |= {"status": "error", "error": str(e), "duration": dur}
                          event = EventData.create(name, content=str(e), metadata=meta)
                          self.node.task_manager.run_background(self.emit_event(event))
                      raise
                  else:
                      return result
      
              return async_wrapper if inspect.iscoroutinefunction(func) else sync_wrapper
      
          return decorator
      

      EventObserver dataclass

      Registered event observer.

      Source code in src/llmling_agent/messaging/event_manager.py
      425
      426
      427
      428
      429
      430
      431
      432
      433
      434
      435
      436
      437
      438
      @dataclass
      class EventObserver:
          """Registered event observer."""
      
          callback: Callable[..., Any]
          interval: timedelta | None = None
          last_run: datetime | None = None
      
          async def __call__(self, event: EventData) -> None:
              """Handle an event."""
              try:
                  await execute(self.callback, event)
              except Exception:
                  logger.exception("Error in event observer")
      

      __call__ async

      __call__(event: EventData) -> None
      

      Handle an event.

      Source code in src/llmling_agent/messaging/event_manager.py
      433
      434
      435
      436
      437
      438
      async def __call__(self, event: EventData) -> None:
          """Handle an event."""
          try:
              await execute(self.callback, event)
          except Exception:
              logger.exception("Error in event observer")