Skip to content

eventnode

Class info

Classes

Name Children Inherits
ChatMessage
llmling_agent.messaging.messages
Common message format for all UI types.
    Event
    llmling_agent.messaging.eventnode
    Base class for event implementations.
      EventNode
      llmling_agent.messaging.eventnode
      Base class for event sources.
        MessageEmitter
        llmling_agent.messaging.messageemitter
        Base class for all message processing nodes.
        MessageStats
        llmling_agent.talk.stats
        Statistics for a single connection.

        🛈 DocStrings

        Event source implementation.

        Event

        Base class for event implementations.

        Handles monitoring for and converting specific types of events. Generically typed with the type of event data produced.

        Source code in src/llmling_agent/messaging/eventnode.py
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        class Event[TEventData]:
            """Base class for event implementations.
        
            Handles monitoring for and converting specific types of events.
            Generically typed with the type of event data produced.
            """
        
            def __init__(self):
                self._stop_event: asyncio.Event = asyncio.Event()
        
            async def __aenter__(self) -> Self:
                """Set up event resources."""
                return self
        
            async def __aexit__(
                self,
                exc_type: type[BaseException] | None,
                exc_val: BaseException | None,
                exc_tb: TracebackType | None,
            ) -> None:
                """Clean up event resources."""
                self._stop_event.set()
        
            @abstractmethod
            def create_monitor(self) -> AsyncGenerator[Any, None]:
                """Create async generator that yields raw event data.
        
                Yields:
                    Raw event data that will be passed to convert_data
                """
                raise NotImplementedError
        
            @abstractmethod
            def convert_data(self, raw_data: Any) -> TEventData:
                """Convert raw event data to typed event data.
        
                Args:
                    raw_data: Data from create_monitor
        
                Returns:
                    Typed event data
                """
                raise NotImplementedError
        

        __aenter__ async

        __aenter__() -> Self
        

        Set up event resources.

        Source code in src/llmling_agent/messaging/eventnode.py
        32
        33
        34
        async def __aenter__(self) -> Self:
            """Set up event resources."""
            return self
        

        __aexit__ async

        __aexit__(
            exc_type: type[BaseException] | None,
            exc_val: BaseException | None,
            exc_tb: TracebackType | None,
        ) -> None
        

        Clean up event resources.

        Source code in src/llmling_agent/messaging/eventnode.py
        36
        37
        38
        39
        40
        41
        42
        43
        async def __aexit__(
            self,
            exc_type: type[BaseException] | None,
            exc_val: BaseException | None,
            exc_tb: TracebackType | None,
        ) -> None:
            """Clean up event resources."""
            self._stop_event.set()
        

        convert_data abstractmethod

        convert_data(raw_data: Any) -> TEventData
        

        Convert raw event data to typed event data.

        Parameters:

        Name Type Description Default
        raw_data Any

        Data from create_monitor

        required

        Returns:

        Type Description
        TEventData

        Typed event data

        Source code in src/llmling_agent/messaging/eventnode.py
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        @abstractmethod
        def convert_data(self, raw_data: Any) -> TEventData:
            """Convert raw event data to typed event data.
        
            Args:
                raw_data: Data from create_monitor
        
            Returns:
                Typed event data
            """
            raise NotImplementedError
        

        create_monitor abstractmethod

        create_monitor() -> AsyncGenerator[Any, None]
        

        Create async generator that yields raw event data.

        Yields:

        Type Description
        AsyncGenerator[Any, None]

        Raw event data that will be passed to convert_data

        Source code in src/llmling_agent/messaging/eventnode.py
        45
        46
        47
        48
        49
        50
        51
        52
        @abstractmethod
        def create_monitor(self) -> AsyncGenerator[Any, None]:
            """Create async generator that yields raw event data.
        
            Yields:
                Raw event data that will be passed to convert_data
            """
            raise NotImplementedError
        

        EventNode

        Bases: MessageEmitter[None, TEventData]

        Base class for event sources.

        An event source monitors for events and emits them as messages. Generically typed with the type of event data it produces.

        Source code in src/llmling_agent/messaging/eventnode.py
         67
         68
         69
         70
         71
         72
         73
         74
         75
         76
         77
         78
         79
         80
         81
         82
         83
         84
         85
         86
         87
         88
         89
         90
         91
         92
         93
         94
         95
         96
         97
         98
         99
        100
        101
        102
        103
        104
        105
        106
        107
        108
        109
        110
        111
        112
        113
        114
        115
        116
        117
        118
        119
        120
        121
        122
        123
        124
        125
        126
        127
        128
        129
        130
        131
        132
        133
        134
        135
        136
        137
        138
        139
        class EventNode[TEventData](MessageEmitter[None, TEventData]):
            """Base class for event sources.
        
            An event source monitors for events and emits them as messages.
            Generically typed with the type of event data it produces.
            """
        
            def __init__(
                self,
                event: Event[TEventData],
                name: str | None = None,
                context: NodeContext | None = None,
                mcp_servers: Sequence[str | MCPServerConfig] | None = None,
                description: str | None = None,
            ) -> None:
                """Initialize event node.
        
                Args:
                    event: Event implementation
                    name: Optional name for this node
                    context: Optional node context
                    mcp_servers: Optional MCP server configurations
                    description: Optional description
                """
                super().__init__(name=name, context=context, description=description)
                self.event = event
                self._running = False
        
            async def __aenter__(self) -> Self:
                """Initialize event resources and start monitoring."""
                await super().__aenter__()
                await self.event.__aenter__()
                # Start monitoring after everything is initialized
                self.create_task(self.start())  # Non-blocking
                return self
        
            async def __aexit__(
                self,
                exc_type: type[BaseException] | None,
                exc_val: BaseException | None,
                exc_tb: TracebackType | None,
            ) -> None:
                """Stop monitoring and clean up resources."""
                # First stop monitoring
                await self.stop()
                # Then cleanup in reverse order
                await self.event.__aexit__(exc_type, exc_val, exc_tb)
                await super().__aexit__(exc_type, exc_val, exc_tb)
        
            async def start(self) -> None:
                """Start monitoring for events."""
                self._running = True
                try:
                    async for data in self.event.create_monitor():
                        if not self._running:
                            break
                        await self.run(data)
                finally:
                    self._running = False
        
            async def stop(self) -> None:
                """Stop monitoring for events."""
                self._running = False
        
            @property
            def stats(self) -> MessageStats:
                return MessageStats(messages=self._logger.message_history)
        
            async def _run(self, *content: Any, **kwargs: Any) -> ChatMessage[TEventData]:
                """Convert event data to message."""
                result = await self.event.convert_data(content[0])
                meta = kwargs.get("metadata", {})
                return ChatMessage(content=result, role="system", name=self.name, metadata=meta)
        

        __aenter__ async

        __aenter__() -> Self
        

        Initialize event resources and start monitoring.

        Source code in src/llmling_agent/messaging/eventnode.py
         95
         96
         97
         98
         99
        100
        101
        async def __aenter__(self) -> Self:
            """Initialize event resources and start monitoring."""
            await super().__aenter__()
            await self.event.__aenter__()
            # Start monitoring after everything is initialized
            self.create_task(self.start())  # Non-blocking
            return self
        

        __aexit__ async

        __aexit__(
            exc_type: type[BaseException] | None,
            exc_val: BaseException | None,
            exc_tb: TracebackType | None,
        ) -> None
        

        Stop monitoring and clean up resources.

        Source code in src/llmling_agent/messaging/eventnode.py
        103
        104
        105
        106
        107
        108
        109
        110
        111
        112
        113
        114
        async def __aexit__(
            self,
            exc_type: type[BaseException] | None,
            exc_val: BaseException | None,
            exc_tb: TracebackType | None,
        ) -> None:
            """Stop monitoring and clean up resources."""
            # First stop monitoring
            await self.stop()
            # Then cleanup in reverse order
            await self.event.__aexit__(exc_type, exc_val, exc_tb)
            await super().__aexit__(exc_type, exc_val, exc_tb)
        

        __init__

        __init__(
            event: Event[TEventData],
            name: str | None = None,
            context: NodeContext | None = None,
            mcp_servers: Sequence[str | MCPServerConfig] | None = None,
            description: str | None = None,
        ) -> None
        

        Initialize event node.

        Parameters:

        Name Type Description Default
        event Event[TEventData]

        Event implementation

        required
        name str | None

        Optional name for this node

        None
        context NodeContext | None

        Optional node context

        None
        mcp_servers Sequence[str | MCPServerConfig] | None

        Optional MCP server configurations

        None
        description str | None

        Optional description

        None
        Source code in src/llmling_agent/messaging/eventnode.py
        74
        75
        76
        77
        78
        79
        80
        81
        82
        83
        84
        85
        86
        87
        88
        89
        90
        91
        92
        93
        def __init__(
            self,
            event: Event[TEventData],
            name: str | None = None,
            context: NodeContext | None = None,
            mcp_servers: Sequence[str | MCPServerConfig] | None = None,
            description: str | None = None,
        ) -> None:
            """Initialize event node.
        
            Args:
                event: Event implementation
                name: Optional name for this node
                context: Optional node context
                mcp_servers: Optional MCP server configurations
                description: Optional description
            """
            super().__init__(name=name, context=context, description=description)
            self.event = event
            self._running = False
        

        _run async

        _run(*content: Any, **kwargs: Any) -> ChatMessage[TEventData]
        

        Convert event data to message.

        Source code in src/llmling_agent/messaging/eventnode.py
        135
        136
        137
        138
        139
        async def _run(self, *content: Any, **kwargs: Any) -> ChatMessage[TEventData]:
            """Convert event data to message."""
            result = await self.event.convert_data(content[0])
            meta = kwargs.get("metadata", {})
            return ChatMessage(content=result, role="system", name=self.name, metadata=meta)
        

        start async

        start() -> None
        

        Start monitoring for events.

        Source code in src/llmling_agent/messaging/eventnode.py
        116
        117
        118
        119
        120
        121
        122
        123
        124
        125
        async def start(self) -> None:
            """Start monitoring for events."""
            self._running = True
            try:
                async for data in self.event.create_monitor():
                    if not self._running:
                        break
                    await self.run(data)
            finally:
                self._running = False
        

        stop async

        stop() -> None
        

        Stop monitoring for events.

        Source code in src/llmling_agent/messaging/eventnode.py
        127
        128
        129
        async def stop(self) -> None:
            """Stop monitoring for events."""
            self._running = False