Skip to content

slashed_agent

Class info

Classes

Name Children Inherits
CommandCompleteEvent
llmling_agent.agent.events
Event indicating slash command execution is complete.
    CommandOutputEvent
    llmling_agent.agent.events
    Event for slash command output.
      SlashedAgent
      llmling_agent.agent.slashed_agent
      Wrapper around Agent that handles slash commands in streams.

        🛈 DocStrings

        Slash command wrapper for Agent that injects command events into streams.

        SlashedAgent

        Wrapper around Agent that handles slash commands in streams.

        Uses the "commands first" strategy from the ACP adapter: 1. Execute all slash commands first 2. Then process remaining content through wrapped agent 3. If only commands, end without LLM processing

        Source code in src/llmling_agent/agent/slashed_agent.py
         49
         50
         51
         52
         53
         54
         55
         56
         57
         58
         59
         60
         61
         62
         63
         64
         65
         66
         67
         68
         69
         70
         71
         72
         73
         74
         75
         76
         77
         78
         79
         80
         81
         82
         83
         84
         85
         86
         87
         88
         89
         90
         91
         92
         93
         94
         95
         96
         97
         98
         99
        100
        101
        102
        103
        104
        105
        106
        107
        108
        109
        110
        111
        112
        113
        114
        115
        116
        117
        118
        119
        120
        121
        122
        123
        124
        125
        126
        127
        128
        129
        130
        131
        132
        133
        134
        135
        136
        137
        138
        139
        140
        141
        142
        143
        144
        145
        146
        147
        148
        149
        150
        151
        152
        153
        154
        155
        156
        157
        158
        159
        160
        161
        162
        163
        164
        165
        166
        167
        168
        169
        170
        171
        172
        173
        174
        175
        176
        177
        178
        179
        180
        181
        182
        183
        184
        185
        186
        187
        188
        189
        190
        191
        192
        193
        194
        195
        196
        197
        198
        199
        200
        201
        202
        203
        204
        205
        206
        207
        208
        209
        210
        211
        212
        213
        214
        215
        216
        217
        218
        219
        220
        221
        222
        223
        224
        225
        226
        227
        228
        229
        230
        231
        232
        233
        234
        class SlashedAgent[TDeps, OutputDataT]:
            """Wrapper around Agent that handles slash commands in streams.
        
            Uses the "commands first" strategy from the ACP adapter:
            1. Execute all slash commands first
            2. Then process remaining content through wrapped agent
            3. If only commands, end without LLM processing
            """
        
            def __init__(
                self,
                agent: Agent[TDeps, OutputDataT],
                command_store: CommandStore | None = None,
                *,
                context_data_factory: Callable[[], Any] | None = None,
            ) -> None:
                """Initialize with wrapped agent and command store.
        
                Args:
                    agent: The agent to wrap
                    command_store: Command store for slash commands (creates default if None)
                    context_data_factory: Optional factory for creating command context data
                """
                self.agent = agent
                self._context_data_factory = context_data_factory
                self._event_queue: asyncio.Queue[CommandStoreEvent] | None = None
        
                # Create store with our streaming event handler
                if command_store is None:
                    from slashed import CommandStore
        
                    from llmling_agent_commands import get_commands
        
                    self.command_store = CommandStore(
                        event_handler=self._bridge_events_to_queue,
                        commands=get_commands(),
                    )
        
                else:
                    self.command_store = command_store
        
            async def _bridge_events_to_queue(self, event: CommandStoreEvent) -> None:
                """Bridge store events to async queue during command execution."""
                if self._event_queue:
                    await self._event_queue.put(event)
        
            def _is_slash_command(self, text: str) -> bool:
                """Check if text starts with a slash command.
        
                Args:
                    text: Text to check
        
                Returns:
                    True if text is a slash command
                """
                return bool(SLASH_PATTERN.match(text.strip()))
        
            async def _execute_slash_command_streaming(
                self, command_text: str
            ) -> AsyncGenerator[CommandOutputEvent | CommandCompleteEvent]:
                """Execute a single slash command and yield events as they happen.
        
                Args:
                    command_text: Full command text including slash
        
                Yields:
                    Command output and completion events
                """
                parsed = _parse_slash_command(command_text)
                if not parsed:
                    logger.warning("Invalid slash command", command=command_text)
                    yield CommandCompleteEvent(command="unknown", success=False)
                    return
        
                cmd_name, args = parsed
        
                # Set up event queue for this command execution
                self._event_queue = asyncio.Queue()
                context_data = (  # Create command context
                    self._context_data_factory() if self._context_data_factory else self.agent.context
                )
        
                cmd_ctx = self.command_store.create_context(data=context_data)
                command_str = f"{cmd_name} {args}".strip()
                execute_task = asyncio.create_task(self.command_store.execute_command(command_str, cmd_ctx))
        
                success = True
                try:
                    # Yield events from queue as command runs
                    while not execute_task.done():
                        try:
                            # Wait for events with short timeout to check task completion
                            event = await asyncio.wait_for(self._event_queue.get(), timeout=0.1)
                            # Convert store events to our stream events
                            match event:
                                case SlashedCommandOutputEvent(output=output):
                                    yield CommandOutputEvent(command=cmd_name, output=output)
                                case CommandExecutedEvent(success=False, error=error) if error:
                                    yield CommandOutputEvent(
                                        command=cmd_name,
                                        output=f"Command error: {error}",
                                    )
                                    success = False
        
                        except TimeoutError:
                            # No events in queue, continue checking if command is done
                            continue
        
                    # Ensure command task completes and handle any remaining events
                    try:
                        await execute_task
                    except Exception as e:
                        logger.exception("Command execution failed", command=cmd_name)
                        success = False
                        yield CommandOutputEvent(command=cmd_name, output=f"Command error: {e}")
        
                    # Drain any remaining events from queue
                    while not self._event_queue.empty():
                        try:
                            match self._event_queue.get_nowait():
                                case SlashedCommandOutputEvent(output=output):
                                    yield CommandOutputEvent(command=cmd_name, output=output)
                        except asyncio.QueueEmpty:
                            break
        
                    # Always yield completion event
                    yield CommandCompleteEvent(command=cmd_name, success=success)
        
                finally:
                    # Clean up event queue
                    self._event_queue = None
        
            async def run_stream(
                self,
                *prompts: PromptCompatible,
                **kwargs: Any,
            ) -> AsyncGenerator[SlashedAgentStreamEvent[OutputDataT]]:
                """Run agent with slash command support.
        
                Separates slash commands from regular prompts, executes commands first,
                then processes remaining content through the wrapped agent.
        
                Args:
                    *prompts: Input prompts (may include slash commands)
                    **kwargs: Additional arguments passed to agent.run_stream
        
                Yields:
                    Stream events from command execution and agent processing
                """
                # Separate slash commands from regular content
                commands: list[str] = []
                regular_prompts: list[Any] = []
        
                for prompt in prompts:
                    if isinstance(prompt, str) and self._is_slash_command(prompt):
                        logger.debug("Found slash command", command=prompt)
                        commands.append(prompt.strip())
                    else:
                        regular_prompts.append(prompt)
        
                # Execute all commands first with streaming
                if commands:
                    for command in commands:
                        logger.info("Processing slash command", command=command)
                        async for cmd_event in self._execute_slash_command_streaming(command):
                            yield cmd_event
        
                # If we have regular content, process it through the agent
                if regular_prompts:
                    logger.debug("Processing prompts through agent", num_prompts=len(regular_prompts))
                    async for event in self.agent.run_stream(*regular_prompts, **kwargs):
                        yield event
        
                # If we only had commands and no regular content, we're done
                # (no additional events needed)
        
            def __getattr__(self, name: str) -> Any:
                """Delegate attribute access to wrapped agent.
        
                Args:
                    name: Attribute name
        
                Returns:
                    Attribute value from wrapped agent
                """
                return getattr(self.agent, name)
        

        __getattr__

        __getattr__(name: str) -> Any
        

        Delegate attribute access to wrapped agent.

        Parameters:

        Name Type Description Default
        name str

        Attribute name

        required

        Returns:

        Type Description
        Any

        Attribute value from wrapped agent

        Source code in src/llmling_agent/agent/slashed_agent.py
        225
        226
        227
        228
        229
        230
        231
        232
        233
        234
        def __getattr__(self, name: str) -> Any:
            """Delegate attribute access to wrapped agent.
        
            Args:
                name: Attribute name
        
            Returns:
                Attribute value from wrapped agent
            """
            return getattr(self.agent, name)
        

        __init__

        __init__(
            agent: Agent[TDeps, OutputDataT],
            command_store: CommandStore | None = None,
            *,
            context_data_factory: Callable[[], Any] | None = None
        ) -> None
        

        Initialize with wrapped agent and command store.

        Parameters:

        Name Type Description Default
        agent Agent[TDeps, OutputDataT]

        The agent to wrap

        required
        command_store CommandStore | None

        Command store for slash commands (creates default if None)

        None
        context_data_factory Callable[[], Any] | None

        Optional factory for creating command context data

        None
        Source code in src/llmling_agent/agent/slashed_agent.py
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        74
        75
        76
        77
        78
        79
        80
        81
        82
        83
        84
        85
        86
        87
        88
        def __init__(
            self,
            agent: Agent[TDeps, OutputDataT],
            command_store: CommandStore | None = None,
            *,
            context_data_factory: Callable[[], Any] | None = None,
        ) -> None:
            """Initialize with wrapped agent and command store.
        
            Args:
                agent: The agent to wrap
                command_store: Command store for slash commands (creates default if None)
                context_data_factory: Optional factory for creating command context data
            """
            self.agent = agent
            self._context_data_factory = context_data_factory
            self._event_queue: asyncio.Queue[CommandStoreEvent] | None = None
        
            # Create store with our streaming event handler
            if command_store is None:
                from slashed import CommandStore
        
                from llmling_agent_commands import get_commands
        
                self.command_store = CommandStore(
                    event_handler=self._bridge_events_to_queue,
                    commands=get_commands(),
                )
        
            else:
                self.command_store = command_store
        

        run_stream async

        run_stream(
            *prompts: PromptCompatible, **kwargs: Any
        ) -> AsyncGenerator[SlashedAgentStreamEvent[OutputDataT]]
        

        Run agent with slash command support.

        Separates slash commands from regular prompts, executes commands first, then processes remaining content through the wrapped agent.

        Parameters:

        Name Type Description Default
        *prompts PromptCompatible

        Input prompts (may include slash commands)

        ()
        **kwargs Any

        Additional arguments passed to agent.run_stream

        {}

        Yields:

        Type Description
        AsyncGenerator[SlashedAgentStreamEvent[OutputDataT]]

        Stream events from command execution and agent processing

        Source code in src/llmling_agent/agent/slashed_agent.py
        181
        182
        183
        184
        185
        186
        187
        188
        189
        190
        191
        192
        193
        194
        195
        196
        197
        198
        199
        200
        201
        202
        203
        204
        205
        206
        207
        208
        209
        210
        211
        212
        213
        214
        215
        216
        217
        218
        219
        220
        async def run_stream(
            self,
            *prompts: PromptCompatible,
            **kwargs: Any,
        ) -> AsyncGenerator[SlashedAgentStreamEvent[OutputDataT]]:
            """Run agent with slash command support.
        
            Separates slash commands from regular prompts, executes commands first,
            then processes remaining content through the wrapped agent.
        
            Args:
                *prompts: Input prompts (may include slash commands)
                **kwargs: Additional arguments passed to agent.run_stream
        
            Yields:
                Stream events from command execution and agent processing
            """
            # Separate slash commands from regular content
            commands: list[str] = []
            regular_prompts: list[Any] = []
        
            for prompt in prompts:
                if isinstance(prompt, str) and self._is_slash_command(prompt):
                    logger.debug("Found slash command", command=prompt)
                    commands.append(prompt.strip())
                else:
                    regular_prompts.append(prompt)
        
            # Execute all commands first with streaming
            if commands:
                for command in commands:
                    logger.info("Processing slash command", command=command)
                    async for cmd_event in self._execute_slash_command_streaming(command):
                        yield cmd_event
        
            # If we have regular content, process it through the agent
            if regular_prompts:
                logger.debug("Processing prompts through agent", num_prompts=len(regular_prompts))
                async for event in self.agent.run_stream(*regular_prompts, **kwargs):
                    yield event