Skip to content

event_manager

Class info

Classes

Name Children Inherits
EmailConfig
llmling_agent.models.events
Email event source configuration.
    EventData
    llmling_agent.messaging.events
    Base class for event data.
    EventManager
    llmling_agent.messaging.event_manager
    Manages multiple event sources and their lifecycles.
      EventObserver
      llmling_agent.messaging.event_manager
      Registered event observer.
        FileWatchConfig
        llmling_agent.models.events
        File watching event source.
          FunctionResultEventData
          llmling_agent.messaging.events
          Event from a function execution result.
            TimeEventConfig
            llmling_agent.models.events
            Time-based event source configuration.
              WebhookConfig
              llmling_agent.models.events
              Webhook event source.

                🛈 DocStrings

                Event manager for handling multiple event sources.

                EventManager

                Manages multiple event sources and their lifecycles.

                Source code in src/llmling_agent/messaging/event_manager.py
                 40
                 41
                 42
                 43
                 44
                 45
                 46
                 47
                 48
                 49
                 50
                 51
                 52
                 53
                 54
                 55
                 56
                 57
                 58
                 59
                 60
                 61
                 62
                 63
                 64
                 65
                 66
                 67
                 68
                 69
                 70
                 71
                 72
                 73
                 74
                 75
                 76
                 77
                 78
                 79
                 80
                 81
                 82
                 83
                 84
                 85
                 86
                 87
                 88
                 89
                 90
                 91
                 92
                 93
                 94
                 95
                 96
                 97
                 98
                 99
                100
                101
                102
                103
                104
                105
                106
                107
                108
                109
                110
                111
                112
                113
                114
                115
                116
                117
                118
                119
                120
                121
                122
                123
                124
                125
                126
                127
                128
                129
                130
                131
                132
                133
                134
                135
                136
                137
                138
                139
                140
                141
                142
                143
                144
                145
                146
                147
                148
                149
                150
                151
                152
                153
                154
                155
                156
                157
                158
                159
                160
                161
                162
                163
                164
                165
                166
                167
                168
                169
                170
                171
                172
                173
                174
                175
                176
                177
                178
                179
                180
                181
                182
                183
                184
                185
                186
                187
                188
                189
                190
                191
                192
                193
                194
                195
                196
                197
                198
                199
                200
                201
                202
                203
                204
                205
                206
                207
                208
                209
                210
                211
                212
                213
                214
                215
                216
                217
                218
                219
                220
                221
                222
                223
                224
                225
                226
                227
                228
                229
                230
                231
                232
                233
                234
                235
                236
                237
                238
                239
                240
                241
                242
                243
                244
                245
                246
                247
                248
                249
                250
                251
                252
                253
                254
                255
                256
                257
                258
                259
                260
                261
                262
                263
                264
                265
                266
                267
                268
                269
                270
                271
                272
                273
                274
                275
                276
                277
                278
                279
                280
                281
                282
                283
                284
                285
                286
                287
                288
                289
                290
                291
                292
                293
                294
                295
                296
                297
                298
                299
                300
                301
                302
                303
                304
                305
                306
                307
                308
                309
                310
                311
                312
                313
                314
                315
                316
                317
                318
                319
                320
                321
                322
                323
                324
                325
                326
                327
                328
                329
                330
                331
                332
                333
                334
                335
                336
                337
                338
                339
                340
                341
                342
                343
                344
                345
                346
                347
                348
                349
                350
                351
                352
                353
                354
                355
                356
                357
                358
                359
                360
                361
                362
                363
                364
                365
                366
                367
                368
                369
                370
                371
                372
                373
                374
                375
                376
                377
                378
                379
                380
                381
                382
                383
                384
                385
                386
                387
                388
                389
                390
                391
                392
                393
                394
                395
                396
                397
                398
                399
                400
                401
                402
                403
                404
                405
                406
                407
                408
                409
                410
                411
                412
                413
                414
                415
                416
                417
                418
                419
                420
                421
                422
                423
                424
                425
                426
                427
                428
                429
                430
                431
                432
                433
                434
                435
                436
                437
                438
                439
                440
                441
                442
                443
                444
                445
                446
                447
                448
                449
                450
                451
                452
                453
                454
                455
                456
                457
                458
                459
                460
                461
                462
                463
                464
                465
                466
                467
                468
                469
                class EventManager:
                    """Manages multiple event sources and their lifecycles."""
                
                    event_processed = Signal(EventData)
                
                    def __init__(
                        self,
                        node: MessageEmitter[Any, Any],
                        enable_events: bool = True,
                        auto_run: bool = True,
                    ):
                        """Initialize event manager.
                
                        Args:
                            node: Agent to manage events for
                            enable_events: Whether to enable event processing
                            auto_run: Whether to automatically call run() for event callbacks
                        """
                        self.node = node
                        self.enabled = enable_events
                        self._sources: dict[str, EventSource] = {}
                        self._callbacks: list[EventCallback] = []
                        self.auto_run = auto_run
                        self._observers: dict[str, list[EventObserver]] = {}
                
                    async def _default_handler(self, event: EventData):
                        """Default event handler that converts events to node runs."""
                        if prompt := event.to_prompt():  # Only run if event provides a prompt
                            await self.node.run(prompt)
                
                    def add_callback(self, callback: EventCallback):
                        """Register an event callback."""
                        self._callbacks.append(callback)
                
                    def remove_callback(self, callback: EventCallback):
                        """Remove a previously registered callback."""
                        self._callbacks.remove(callback)
                
                    async def emit_event(self, event: EventData):
                        """Emit event to all callbacks and optionally handle via node."""
                        if not self.enabled:
                            return
                
                        # Run custom callbacks
                        for callback in self._callbacks:
                            try:
                                result = callback(event)
                                if isinstance(result, Awaitable):
                                    await result
                            except Exception:
                                logger.exception("Error in event callback %r", callback.__name__)
                
                        # Run default handler if enabled
                        if self.auto_run:
                            try:
                                prompt = event.to_prompt()
                                if prompt:
                                    await self.node.run(prompt)
                            except Exception:
                                logger.exception("Error in default event handler")
                        self.event_processed.emit(event)
                
                    async def add_file_watch(
                        self,
                        paths: str | Sequence[str],
                        *,
                        name: str | None = None,
                        extensions: list[str] | None = None,
                        ignore_paths: list[str] | None = None,
                        recursive: bool = True,
                        debounce: int = 1600,
                    ) -> EventSource:
                        """Add file system watch event source.
                
                        Args:
                            paths: Paths or patterns to watch
                            name: Optional source name (default: generated from paths)
                            extensions: File extensions to monitor
                            ignore_paths: Paths to ignore
                            recursive: Whether to watch subdirectories
                            debounce: Minimum time between events (ms)
                        """
                        path_list = [paths] if isinstance(paths, str) else list(paths)
                        config = FileWatchConfig(
                            name=name or f"file_watch_{len(self._sources)}",
                            paths=path_list,
                            extensions=extensions,
                            ignore_paths=ignore_paths,
                            recursive=recursive,
                            debounce=debounce,
                        )
                        return await self.add_source(config)
                
                    async def add_webhook(
                        self,
                        path: str,
                        *,
                        name: str | None = None,
                        port: int = 8000,
                        secret: str | None = None,
                    ) -> EventSource:
                        """Add webhook event source.
                
                        Args:
                            path: URL path to listen on
                            name: Optional source name
                            port: Port to listen on
                            secret: Optional secret for request validation
                        """
                        name = name or f"webhook_{len(self._sources)}"
                        config = WebhookConfig(name=name, path=path, port=port, secret=secret)
                        return await self.add_source(config)
                
                    async def add_timed_event(
                        self,
                        schedule: str,
                        prompt: str,
                        *,
                        name: str | None = None,
                        timezone: str | None = None,
                        skip_missed: bool = False,
                    ) -> TimeEventSource:
                        """Add time-based event source.
                
                        Args:
                            schedule: Cron expression (e.g. "0 9 * * 1-5" for weekdays at 9am)
                            prompt: Prompt to send when triggered
                            name: Optional source name
                            timezone: Optional timezone (system default if None)
                            skip_missed: Whether to skip missed executions
                        """
                        config = TimeEventConfig(
                            name=name or f"timed_{len(self._sources)}",
                            schedule=schedule,
                            prompt=prompt,
                            timezone=timezone,
                            skip_missed=skip_missed,
                        )
                        return await self.add_source(config)  # type: ignore
                
                    async def add_email_watch(
                        self,
                        host: str,
                        username: str,
                        password: str,
                        *,
                        name: str | None = None,
                        port: int = 993,
                        folder: str = "INBOX",
                        ssl: bool = True,
                        check_interval: int = 60,
                        mark_seen: bool = True,
                        filters: dict[str, str] | None = None,
                        max_size: int | None = None,
                    ) -> EventSource:
                        """Add email monitoring event source.
                
                        Args:
                            host: IMAP server hostname
                            username: Email account username
                            password: Account password or app password
                            name: Optional source name
                            port: Server port (default: 993 for IMAP SSL)
                            folder: Mailbox to monitor
                            ssl: Whether to use SSL/TLS
                            check_interval: Seconds between checks
                            mark_seen: Whether to mark processed emails as seen
                            filters: Optional email filtering criteria
                            max_size: Maximum email size in bytes
                        """
                        config = EmailConfig(
                            name=name or f"email_{len(self._sources)}",
                            host=host,
                            username=username,
                            password=SecretStr(password),
                            port=port,
                            folder=folder,
                            ssl=ssl,
                            check_interval=check_interval,
                            mark_seen=mark_seen,
                            filters=filters or {},
                            max_size=max_size,
                        )
                        return await self.add_source(config)
                
                    async def add_source(self, config: EventConfig) -> EventSource:
                        """Add and start a new event source.
                
                        Args:
                            config: Event source configuration
                
                        Raises:
                            ValueError: If source already exists or is invalid
                        """
                        logger.debug("Setting up event source: %s (%s)", config.name, config.type)
                        from llmling_agent_events.base import EventSource
                
                        if config.name in self._sources:
                            msg = f"Event source already exists: {config.name}"
                            raise ValueError(msg)
                
                        try:
                            source = EventSource.from_config(config)
                            await source.connect()
                            self._sources[config.name] = source
                            # Start processing events
                            name = f"event_processor_{config.name}"
                            self.node.create_task(self._process_events(source), name=name)
                            logger.debug("Added event source: %s", config.name)
                        except Exception as e:
                            msg = f"Failed to add event source {config.name}"
                            logger.exception(msg)
                            raise RuntimeError(msg) from e
                        else:
                            return source
                
                    async def remove_source(self, name: str):
                        """Stop and remove an event source.
                
                        Args:
                            name: Name of source to remove
                        """
                        if source := self._sources.pop(name, None):
                            await source.disconnect()
                            logger.debug("Removed event source: %s", name)
                
                    async def _process_events(self, source: EventSource):
                        """Process events from a source.
                
                        Args:
                            source: Event source to process
                        """
                        try:
                            # Get the async iterator from the coroutine
                            async for event in source.events():
                                if not self.enabled:
                                    logger.debug("Event processing disabled, skipping event")
                                    continue
                                await self.emit_event(event)
                
                        except asyncio.CancelledError:
                            logger.debug("Event processing cancelled")
                            raise
                
                        except Exception:
                            logger.exception("Error processing events")
                
                    async def cleanup(self):
                        """Clean up all event sources and tasks."""
                        self.enabled = False
                
                        for name in list(self._sources):
                            await self.remove_source(name)
                
                    async def __aenter__(self) -> Self:
                        """Allow using manager as async context manager."""
                        if not self.enabled:
                            return self
                
                        # Set up triggers from config
                        if (
                            self.node.context
                            and self.node.context.config
                            and self.node.context.config.triggers
                        ):
                            for trigger in self.node.context.config.triggers:
                                await self.add_source(trigger)
                
                        return self
                
                    async def __aexit__(self, *exc: object):
                        """Clean up when exiting context."""
                        await self.cleanup()
                
                    @overload
                    def track[T](
                        self,
                        event_name: str | None = None,
                        **event_metadata: Any,
                    ) -> Callable[[Callable[..., T]], Callable[..., T]]: ...
                
                    @overload
                    def track[T](
                        self,
                        event_name: str | None = None,
                        **event_metadata: Any,
                    ) -> Callable[
                        [Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]
                    ]: ...
                
                    def track(
                        self,
                        event_name: str | None = None,
                        **event_metadata: Any,
                    ) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
                        """Track function calls as events.
                
                        Args:
                            event_name: Optional name for the event (defaults to function name)
                            **event_metadata: Additional metadata to include with event
                
                        Example:
                            @event_manager.track("user_search")
                            async def search_docs(query: str) -> list[Doc]:
                                results = await search(query)
                                return results  # This result becomes event data
                        """
                
                        def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
                            name = event_name or func.__name__
                
                            @wraps(func)
                            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                                start_time = datetime.now()
                                try:
                                    result = await func(*args, **kwargs)
                                    if self.enabled:
                                        meta = {
                                            "status": "success",
                                            "duration": datetime.now() - start_time,
                                            "args": args,
                                            "kwargs": kwargs,
                                            **event_metadata,
                                        }
                                        event = EventData.create(name, content=result, metadata=meta)
                                        await self.emit_event(event)
                                except Exception as e:
                                    if self.enabled:
                                        meta = {
                                            "status": "error",
                                            "error": str(e),
                                            "duration": datetime.now() - start_time,
                                            "args": args,
                                            "kwargs": kwargs,
                                            **event_metadata,
                                        }
                                        event = EventData.create(name, content=str(e), metadata=meta)
                                        await self.emit_event(event)
                                    raise
                                else:
                                    return result
                
                            @wraps(func)
                            def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
                                start_time = datetime.now()
                                try:
                                    result = func(*args, **kwargs)
                                    if self.enabled:
                                        meta = {
                                            "status": "success",
                                            "duration": datetime.now() - start_time,
                                            "args": args,
                                            "kwargs": kwargs,
                                            **event_metadata,
                                        }
                                        event = EventData.create(name, content=result, metadata=meta)
                                        self.node.run_background(self.emit_event(event))
                                except Exception as e:
                                    if self.enabled:
                                        meta = {
                                            "status": "error",
                                            "error": str(e),
                                            "duration": datetime.now() - start_time,
                                            "args": args,
                                            "kwargs": kwargs,
                                            **event_metadata,
                                        }
                                        event = EventData.create(name, content=str(e), metadata=meta)
                                        self.node.run_background(self.emit_event(event))
                                    raise
                                else:
                                    return result
                
                            return async_wrapper if inspect.iscoroutinefunction(func) else sync_wrapper
                
                        return decorator
                
                    @overload
                    def poll[T](
                        self,
                        event_type: str,
                        interval: timedelta | None = None,
                    ) -> Callable[[Callable[..., T]], Callable[..., T]]: ...
                
                    @overload
                    def poll[T](
                        self,
                        event_type: str,
                        interval: timedelta | None = None,
                    ) -> Callable[
                        [Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]
                    ]: ...
                
                    def poll(
                        self,
                        event_type: str,
                        interval: timedelta | None = None,
                    ) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
                        """Decorator to register an event observer.
                
                        Args:
                            event_type: Type of event to observe
                            interval: Optional polling interval for periodic checks
                
                        Example:
                            @event_manager.observe("file_changed")
                            async def handle_file_change(event: FileEventData):
                                await process_file(event.path)
                        """
                
                        def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
                            observer = EventObserver(func, interval=interval)
                            self._observers.setdefault(event_type, []).append(observer)
                
                            @wraps(func)
                            async def wrapper(*args: Any, **kwargs: Any) -> Any:
                                result = await execute(func, *args, **kwargs)
                                # Convert result to event and emit
                                if self.enabled:
                                    typ = type(result).__name__
                                    meta = {"type": "function_result", "result_type": typ}
                                    event = FunctionResultEventData(
                                        result=result, source=event_type, metadata=meta
                                    )
                                    await self.emit_event(event)
                                return result
                
                            return wrapper
                
                        return decorator
                

                __aenter__ async

                __aenter__() -> Self
                

                Allow using manager as async context manager.

                Source code in src/llmling_agent/messaging/event_manager.py
                294
                295
                296
                297
                298
                299
                300
                301
                302
                303
                304
                305
                306
                307
                308
                async def __aenter__(self) -> Self:
                    """Allow using manager as async context manager."""
                    if not self.enabled:
                        return self
                
                    # Set up triggers from config
                    if (
                        self.node.context
                        and self.node.context.config
                        and self.node.context.config.triggers
                    ):
                        for trigger in self.node.context.config.triggers:
                            await self.add_source(trigger)
                
                    return self
                

                __aexit__ async

                __aexit__(*exc: object)
                

                Clean up when exiting context.

                Source code in src/llmling_agent/messaging/event_manager.py
                310
                311
                312
                async def __aexit__(self, *exc: object):
                    """Clean up when exiting context."""
                    await self.cleanup()
                

                __init__

                __init__(
                    node: MessageEmitter[Any, Any], enable_events: bool = True, auto_run: bool = True
                )
                

                Initialize event manager.

                Parameters:

                Name Type Description Default
                node MessageEmitter[Any, Any]

                Agent to manage events for

                required
                enable_events bool

                Whether to enable event processing

                True
                auto_run bool

                Whether to automatically call run() for event callbacks

                True
                Source code in src/llmling_agent/messaging/event_manager.py
                45
                46
                47
                48
                49
                50
                51
                52
                53
                54
                55
                56
                57
                58
                59
                60
                61
                62
                63
                def __init__(
                    self,
                    node: MessageEmitter[Any, Any],
                    enable_events: bool = True,
                    auto_run: bool = True,
                ):
                    """Initialize event manager.
                
                    Args:
                        node: Agent to manage events for
                        enable_events: Whether to enable event processing
                        auto_run: Whether to automatically call run() for event callbacks
                    """
                    self.node = node
                    self.enabled = enable_events
                    self._sources: dict[str, EventSource] = {}
                    self._callbacks: list[EventCallback] = []
                    self.auto_run = auto_run
                    self._observers: dict[str, list[EventObserver]] = {}
                

                _default_handler async

                _default_handler(event: EventData)
                

                Default event handler that converts events to node runs.

                Source code in src/llmling_agent/messaging/event_manager.py
                65
                66
                67
                68
                async def _default_handler(self, event: EventData):
                    """Default event handler that converts events to node runs."""
                    if prompt := event.to_prompt():  # Only run if event provides a prompt
                        await self.node.run(prompt)
                

                _process_events async

                _process_events(source: EventSource)
                

                Process events from a source.

                Parameters:

                Name Type Description Default
                source EventSource

                Event source to process

                required
                Source code in src/llmling_agent/messaging/event_manager.py
                266
                267
                268
                269
                270
                271
                272
                273
                274
                275
                276
                277
                278
                279
                280
                281
                282
                283
                284
                285
                async def _process_events(self, source: EventSource):
                    """Process events from a source.
                
                    Args:
                        source: Event source to process
                    """
                    try:
                        # Get the async iterator from the coroutine
                        async for event in source.events():
                            if not self.enabled:
                                logger.debug("Event processing disabled, skipping event")
                                continue
                            await self.emit_event(event)
                
                    except asyncio.CancelledError:
                        logger.debug("Event processing cancelled")
                        raise
                
                    except Exception:
                        logger.exception("Error processing events")
                

                add_callback

                add_callback(callback: EventCallback)
                

                Register an event callback.

                Source code in src/llmling_agent/messaging/event_manager.py
                70
                71
                72
                def add_callback(self, callback: EventCallback):
                    """Register an event callback."""
                    self._callbacks.append(callback)
                

                add_email_watch async

                add_email_watch(
                    host: str,
                    username: str,
                    password: str,
                    *,
                    name: str | None = None,
                    port: int = 993,
                    folder: str = "INBOX",
                    ssl: bool = True,
                    check_interval: int = 60,
                    mark_seen: bool = True,
                    filters: dict[str, str] | None = None,
                    max_size: int | None = None,
                ) -> EventSource
                

                Add email monitoring event source.

                Parameters:

                Name Type Description Default
                host str

                IMAP server hostname

                required
                username str

                Email account username

                required
                password str

                Account password or app password

                required
                name str | None

                Optional source name

                None
                port int

                Server port (default: 993 for IMAP SSL)

                993
                folder str

                Mailbox to monitor

                'INBOX'
                ssl bool

                Whether to use SSL/TLS

                True
                check_interval int

                Seconds between checks

                60
                mark_seen bool

                Whether to mark processed emails as seen

                True
                filters dict[str, str] | None

                Optional email filtering criteria

                None
                max_size int | None

                Maximum email size in bytes

                None
                Source code in src/llmling_agent/messaging/event_manager.py
                180
                181
                182
                183
                184
                185
                186
                187
                188
                189
                190
                191
                192
                193
                194
                195
                196
                197
                198
                199
                200
                201
                202
                203
                204
                205
                206
                207
                208
                209
                210
                211
                212
                213
                214
                215
                216
                217
                218
                219
                220
                221
                222
                223
                async def add_email_watch(
                    self,
                    host: str,
                    username: str,
                    password: str,
                    *,
                    name: str | None = None,
                    port: int = 993,
                    folder: str = "INBOX",
                    ssl: bool = True,
                    check_interval: int = 60,
                    mark_seen: bool = True,
                    filters: dict[str, str] | None = None,
                    max_size: int | None = None,
                ) -> EventSource:
                    """Add email monitoring event source.
                
                    Args:
                        host: IMAP server hostname
                        username: Email account username
                        password: Account password or app password
                        name: Optional source name
                        port: Server port (default: 993 for IMAP SSL)
                        folder: Mailbox to monitor
                        ssl: Whether to use SSL/TLS
                        check_interval: Seconds between checks
                        mark_seen: Whether to mark processed emails as seen
                        filters: Optional email filtering criteria
                        max_size: Maximum email size in bytes
                    """
                    config = EmailConfig(
                        name=name or f"email_{len(self._sources)}",
                        host=host,
                        username=username,
                        password=SecretStr(password),
                        port=port,
                        folder=folder,
                        ssl=ssl,
                        check_interval=check_interval,
                        mark_seen=mark_seen,
                        filters=filters or {},
                        max_size=max_size,
                    )
                    return await self.add_source(config)
                

                add_file_watch async

                add_file_watch(
                    paths: str | Sequence[str],
                    *,
                    name: str | None = None,
                    extensions: list[str] | None = None,
                    ignore_paths: list[str] | None = None,
                    recursive: bool = True,
                    debounce: int = 1600,
                ) -> EventSource
                

                Add file system watch event source.

                Parameters:

                Name Type Description Default
                paths str | Sequence[str]

                Paths or patterns to watch

                required
                name str | None

                Optional source name (default: generated from paths)

                None
                extensions list[str] | None

                File extensions to monitor

                None
                ignore_paths list[str] | None

                Paths to ignore

                None
                recursive bool

                Whether to watch subdirectories

                True
                debounce int

                Minimum time between events (ms)

                1600
                Source code in src/llmling_agent/messaging/event_manager.py
                102
                103
                104
                105
                106
                107
                108
                109
                110
                111
                112
                113
                114
                115
                116
                117
                118
                119
                120
                121
                122
                123
                124
                125
                126
                127
                128
                129
                130
                131
                async def add_file_watch(
                    self,
                    paths: str | Sequence[str],
                    *,
                    name: str | None = None,
                    extensions: list[str] | None = None,
                    ignore_paths: list[str] | None = None,
                    recursive: bool = True,
                    debounce: int = 1600,
                ) -> EventSource:
                    """Add file system watch event source.
                
                    Args:
                        paths: Paths or patterns to watch
                        name: Optional source name (default: generated from paths)
                        extensions: File extensions to monitor
                        ignore_paths: Paths to ignore
                        recursive: Whether to watch subdirectories
                        debounce: Minimum time between events (ms)
                    """
                    path_list = [paths] if isinstance(paths, str) else list(paths)
                    config = FileWatchConfig(
                        name=name or f"file_watch_{len(self._sources)}",
                        paths=path_list,
                        extensions=extensions,
                        ignore_paths=ignore_paths,
                        recursive=recursive,
                        debounce=debounce,
                    )
                    return await self.add_source(config)
                

                add_source async

                add_source(config: EventConfig) -> EventSource
                

                Add and start a new event source.

                Parameters:

                Name Type Description Default
                config EventConfig

                Event source configuration

                required

                Raises:

                Type Description
                ValueError

                If source already exists or is invalid

                Source code in src/llmling_agent/messaging/event_manager.py
                225
                226
                227
                228
                229
                230
                231
                232
                233
                234
                235
                236
                237
                238
                239
                240
                241
                242
                243
                244
                245
                246
                247
                248
                249
                250
                251
                252
                253
                254
                async def add_source(self, config: EventConfig) -> EventSource:
                    """Add and start a new event source.
                
                    Args:
                        config: Event source configuration
                
                    Raises:
                        ValueError: If source already exists or is invalid
                    """
                    logger.debug("Setting up event source: %s (%s)", config.name, config.type)
                    from llmling_agent_events.base import EventSource
                
                    if config.name in self._sources:
                        msg = f"Event source already exists: {config.name}"
                        raise ValueError(msg)
                
                    try:
                        source = EventSource.from_config(config)
                        await source.connect()
                        self._sources[config.name] = source
                        # Start processing events
                        name = f"event_processor_{config.name}"
                        self.node.create_task(self._process_events(source), name=name)
                        logger.debug("Added event source: %s", config.name)
                    except Exception as e:
                        msg = f"Failed to add event source {config.name}"
                        logger.exception(msg)
                        raise RuntimeError(msg) from e
                    else:
                        return source
                

                add_timed_event async

                add_timed_event(
                    schedule: str,
                    prompt: str,
                    *,
                    name: str | None = None,
                    timezone: str | None = None,
                    skip_missed: bool = False,
                ) -> TimeEventSource
                

                Add time-based event source.

                Parameters:

                Name Type Description Default
                schedule str

                Cron expression (e.g. "0 9 * * 1-5" for weekdays at 9am)

                required
                prompt str

                Prompt to send when triggered

                required
                name str | None

                Optional source name

                None
                timezone str | None

                Optional timezone (system default if None)

                None
                skip_missed bool

                Whether to skip missed executions

                False
                Source code in src/llmling_agent/messaging/event_manager.py
                153
                154
                155
                156
                157
                158
                159
                160
                161
                162
                163
                164
                165
                166
                167
                168
                169
                170
                171
                172
                173
                174
                175
                176
                177
                178
                async def add_timed_event(
                    self,
                    schedule: str,
                    prompt: str,
                    *,
                    name: str | None = None,
                    timezone: str | None = None,
                    skip_missed: bool = False,
                ) -> TimeEventSource:
                    """Add time-based event source.
                
                    Args:
                        schedule: Cron expression (e.g. "0 9 * * 1-5" for weekdays at 9am)
                        prompt: Prompt to send when triggered
                        name: Optional source name
                        timezone: Optional timezone (system default if None)
                        skip_missed: Whether to skip missed executions
                    """
                    config = TimeEventConfig(
                        name=name or f"timed_{len(self._sources)}",
                        schedule=schedule,
                        prompt=prompt,
                        timezone=timezone,
                        skip_missed=skip_missed,
                    )
                    return await self.add_source(config)  # type: ignore
                

                add_webhook async

                add_webhook(
                    path: str, *, name: str | None = None, port: int = 8000, secret: str | None = None
                ) -> EventSource
                

                Add webhook event source.

                Parameters:

                Name Type Description Default
                path str

                URL path to listen on

                required
                name str | None

                Optional source name

                None
                port int

                Port to listen on

                8000
                secret str | None

                Optional secret for request validation

                None
                Source code in src/llmling_agent/messaging/event_manager.py
                133
                134
                135
                136
                137
                138
                139
                140
                141
                142
                143
                144
                145
                146
                147
                148
                149
                150
                151
                async def add_webhook(
                    self,
                    path: str,
                    *,
                    name: str | None = None,
                    port: int = 8000,
                    secret: str | None = None,
                ) -> EventSource:
                    """Add webhook event source.
                
                    Args:
                        path: URL path to listen on
                        name: Optional source name
                        port: Port to listen on
                        secret: Optional secret for request validation
                    """
                    name = name or f"webhook_{len(self._sources)}"
                    config = WebhookConfig(name=name, path=path, port=port, secret=secret)
                    return await self.add_source(config)
                

                cleanup async

                cleanup()
                

                Clean up all event sources and tasks.

                Source code in src/llmling_agent/messaging/event_manager.py
                287
                288
                289
                290
                291
                292
                async def cleanup(self):
                    """Clean up all event sources and tasks."""
                    self.enabled = False
                
                    for name in list(self._sources):
                        await self.remove_source(name)
                

                emit_event async

                emit_event(event: EventData)
                

                Emit event to all callbacks and optionally handle via node.

                Source code in src/llmling_agent/messaging/event_manager.py
                 78
                 79
                 80
                 81
                 82
                 83
                 84
                 85
                 86
                 87
                 88
                 89
                 90
                 91
                 92
                 93
                 94
                 95
                 96
                 97
                 98
                 99
                100
                async def emit_event(self, event: EventData):
                    """Emit event to all callbacks and optionally handle via node."""
                    if not self.enabled:
                        return
                
                    # Run custom callbacks
                    for callback in self._callbacks:
                        try:
                            result = callback(event)
                            if isinstance(result, Awaitable):
                                await result
                        except Exception:
                            logger.exception("Error in event callback %r", callback.__name__)
                
                    # Run default handler if enabled
                    if self.auto_run:
                        try:
                            prompt = event.to_prompt()
                            if prompt:
                                await self.node.run(prompt)
                        except Exception:
                            logger.exception("Error in default event handler")
                    self.event_processed.emit(event)
                

                poll

                poll(
                    event_type: str, interval: timedelta | None = None
                ) -> Callable[[Callable[..., T]], Callable[..., T]]
                
                poll(
                    event_type: str, interval: timedelta | None = None
                ) -> Callable[
                    [Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]
                ]
                
                poll(
                    event_type: str, interval: timedelta | None = None
                ) -> Callable[[Callable[..., Any]], Callable[..., Any]]
                

                Decorator to register an event observer.

                Parameters:

                Name Type Description Default
                event_type str

                Type of event to observe

                required
                interval timedelta | None

                Optional polling interval for periodic checks

                None
                Example

                @event_manager.observe("file_changed") async def handle_file_change(event: FileEventData): await process_file(event.path)

                Source code in src/llmling_agent/messaging/event_manager.py
                433
                434
                435
                436
                437
                438
                439
                440
                441
                442
                443
                444
                445
                446
                447
                448
                449
                450
                451
                452
                453
                454
                455
                456
                457
                458
                459
                460
                461
                462
                463
                464
                465
                466
                467
                468
                469
                def poll(
                    self,
                    event_type: str,
                    interval: timedelta | None = None,
                ) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
                    """Decorator to register an event observer.
                
                    Args:
                        event_type: Type of event to observe
                        interval: Optional polling interval for periodic checks
                
                    Example:
                        @event_manager.observe("file_changed")
                        async def handle_file_change(event: FileEventData):
                            await process_file(event.path)
                    """
                
                    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
                        observer = EventObserver(func, interval=interval)
                        self._observers.setdefault(event_type, []).append(observer)
                
                        @wraps(func)
                        async def wrapper(*args: Any, **kwargs: Any) -> Any:
                            result = await execute(func, *args, **kwargs)
                            # Convert result to event and emit
                            if self.enabled:
                                typ = type(result).__name__
                                meta = {"type": "function_result", "result_type": typ}
                                event = FunctionResultEventData(
                                    result=result, source=event_type, metadata=meta
                                )
                                await self.emit_event(event)
                            return result
                
                        return wrapper
                
                    return decorator
                

                remove_callback

                remove_callback(callback: EventCallback)
                

                Remove a previously registered callback.

                Source code in src/llmling_agent/messaging/event_manager.py
                74
                75
                76
                def remove_callback(self, callback: EventCallback):
                    """Remove a previously registered callback."""
                    self._callbacks.remove(callback)
                

                remove_source async

                remove_source(name: str)
                

                Stop and remove an event source.

                Parameters:

                Name Type Description Default
                name str

                Name of source to remove

                required
                Source code in src/llmling_agent/messaging/event_manager.py
                256
                257
                258
                259
                260
                261
                262
                263
                264
                async def remove_source(self, name: str):
                    """Stop and remove an event source.
                
                    Args:
                        name: Name of source to remove
                    """
                    if source := self._sources.pop(name, None):
                        await source.disconnect()
                        logger.debug("Removed event source: %s", name)
                

                track

                track(
                    event_name: str | None = None, **event_metadata: Any
                ) -> Callable[[Callable[..., T]], Callable[..., T]]
                
                track(
                    event_name: str | None = None, **event_metadata: Any
                ) -> Callable[
                    [Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]
                ]
                
                track(
                    event_name: str | None = None, **event_metadata: Any
                ) -> Callable[[Callable[..., Any]], Callable[..., Any]]
                

                Track function calls as events.

                Parameters:

                Name Type Description Default
                event_name str | None

                Optional name for the event (defaults to function name)

                None
                **event_metadata Any

                Additional metadata to include with event

                {}
                Example

                @event_manager.track("user_search") async def search_docs(query: str) -> list[Doc]: results = await search(query) return results # This result becomes event data

                Source code in src/llmling_agent/messaging/event_manager.py
                330
                331
                332
                333
                334
                335
                336
                337
                338
                339
                340
                341
                342
                343
                344
                345
                346
                347
                348
                349
                350
                351
                352
                353
                354
                355
                356
                357
                358
                359
                360
                361
                362
                363
                364
                365
                366
                367
                368
                369
                370
                371
                372
                373
                374
                375
                376
                377
                378
                379
                380
                381
                382
                383
                384
                385
                386
                387
                388
                389
                390
                391
                392
                393
                394
                395
                396
                397
                398
                399
                400
                401
                402
                403
                404
                405
                406
                407
                408
                409
                410
                411
                412
                413
                414
                415
                def track(
                    self,
                    event_name: str | None = None,
                    **event_metadata: Any,
                ) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
                    """Track function calls as events.
                
                    Args:
                        event_name: Optional name for the event (defaults to function name)
                        **event_metadata: Additional metadata to include with event
                
                    Example:
                        @event_manager.track("user_search")
                        async def search_docs(query: str) -> list[Doc]:
                            results = await search(query)
                            return results  # This result becomes event data
                    """
                
                    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
                        name = event_name or func.__name__
                
                        @wraps(func)
                        async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                            start_time = datetime.now()
                            try:
                                result = await func(*args, **kwargs)
                                if self.enabled:
                                    meta = {
                                        "status": "success",
                                        "duration": datetime.now() - start_time,
                                        "args": args,
                                        "kwargs": kwargs,
                                        **event_metadata,
                                    }
                                    event = EventData.create(name, content=result, metadata=meta)
                                    await self.emit_event(event)
                            except Exception as e:
                                if self.enabled:
                                    meta = {
                                        "status": "error",
                                        "error": str(e),
                                        "duration": datetime.now() - start_time,
                                        "args": args,
                                        "kwargs": kwargs,
                                        **event_metadata,
                                    }
                                    event = EventData.create(name, content=str(e), metadata=meta)
                                    await self.emit_event(event)
                                raise
                            else:
                                return result
                
                        @wraps(func)
                        def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
                            start_time = datetime.now()
                            try:
                                result = func(*args, **kwargs)
                                if self.enabled:
                                    meta = {
                                        "status": "success",
                                        "duration": datetime.now() - start_time,
                                        "args": args,
                                        "kwargs": kwargs,
                                        **event_metadata,
                                    }
                                    event = EventData.create(name, content=result, metadata=meta)
                                    self.node.run_background(self.emit_event(event))
                            except Exception as e:
                                if self.enabled:
                                    meta = {
                                        "status": "error",
                                        "error": str(e),
                                        "duration": datetime.now() - start_time,
                                        "args": args,
                                        "kwargs": kwargs,
                                        **event_metadata,
                                    }
                                    event = EventData.create(name, content=str(e), metadata=meta)
                                    self.node.run_background(self.emit_event(event))
                                raise
                            else:
                                return result
                
                        return async_wrapper if inspect.iscoroutinefunction(func) else sync_wrapper
                
                    return decorator
                

                EventObserver dataclass

                Registered event observer.

                Source code in src/llmling_agent/messaging/event_manager.py
                472
                473
                474
                475
                476
                477
                478
                479
                480
                481
                482
                483
                484
                485
                @dataclass
                class EventObserver:
                    """Registered event observer."""
                
                    callback: Callable[..., Any]
                    interval: timedelta | None = None
                    last_run: datetime | None = None
                
                    async def __call__(self, event: EventData):
                        """Handle an event."""
                        try:
                            await execute(self.callback, event)
                        except Exception:
                            logger.exception("Error in event observer")
                

                __call__ async

                __call__(event: EventData)
                

                Handle an event.

                Source code in src/llmling_agent/messaging/event_manager.py
                480
                481
                482
                483
                484
                485
                async def __call__(self, event: EventData):
                    """Handle an event."""
                    try:
                        await execute(self.callback, event)
                    except Exception:
                        logger.exception("Error in event observer")