Skip to content

progress_executor

Class info

Classes

Name Children Inherits
ProgressTrackingExecutor
llmling_agent.resource_providers.codemode.progress_executor
Execute Python code statement-by-statement with automatic progress reporting.

    🛈 DocStrings

    Statement-by-statement execution with automatic progress tracking.

    ProgressTrackingExecutor

    Execute Python code statement-by-statement with automatic progress reporting.

    Source code in src/llmling_agent/resource_providers/codemode/progress_executor.py
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    class ProgressTrackingExecutor:
        """Execute Python code statement-by-statement with automatic progress reporting."""
    
        def __init__(
            self, progress_callback: ProgressCallback | None = None, step_delay: float = 0.01
        ):
            """Initialize executor.
    
            Args:
                progress_callback: Optional callback for progress reporting
                step_delay: Delay between statement executions (for async behavior)
            """
            self.globals: dict[str, Any] = {"__builtins__": __builtins__}
            self.locals: dict[str, Any] = {}
            self.progress_callback: ProgressCallback | None = progress_callback
            self.step_delay = step_delay
            self.total_statements = 0
            self.current_statement = 0
            self.start_time: float | None = None
    
        def update_namespace(self, namespace: dict[str, Any]) -> None:
            """Update the execution namespace with additional globals."""
            self.globals.update(namespace)
    
        def count_statements(self, code: str) -> int:
            """Count total statements in the code."""
            try:
                tree = ast.parse(code)
                return len(tree.body)
            except SyntaxError:
                return 0
    
        async def execute_with_progress(self, code: str) -> Any:
            """Execute code with automatic progress reporting.
    
            Args:
                code: Python source code to execute
    
            Returns:
                Result of code execution
            """
            self.total_statements = self.count_statements(code)
            self.current_statement = 0
            self.start_time = time.time()
    
            if self.total_statements == 0:
                return "No statements to execute"
    
            try:
                tree = ast.parse(code)
    
                for i, node in enumerate(tree.body):
                    self.current_statement = i + 1
    
                    # Report progress if callback available
                    if self.progress_callback:
                        node_type = type(node).__name__
                        message = f"Executing {node_type.lower()} statement"
    
                        await self.progress_callback(
                            float(self.current_statement),
                            float(self.total_statements),
                            message,
                        )
    
                    # Execute the statement
                    compiled = compile(
                        ast.Module(body=[node], type_ignores=[]), f"<statement_{i}>", "exec"
                    )
                    exec(compiled, self.globals, self.locals)
    
                    # Small delay for async behavior
                    if self.step_delay > 0:
                        await asyncio.sleep(self.step_delay)
    
                # Try to get result from main() function or return success
                if "main" in self.locals and callable(self.locals["main"]):
                    result = self.locals["main"]()
                    if inspect.iscoroutine(result):
                        result = await result
                    return result
            except Exception as e:
                # Report error with current progress
                if self.progress_callback:
                    await self.progress_callback(
                        float(self.current_statement),
                        float(self.total_statements),
                        f"Error: {str(e)[:50]}...",
                    )
                raise
            else:
                return "Code executed successfully"
    
        async def execute_statements(
            self, code: str
        ) -> AsyncIterator[tuple[str, dict[str, Any]]]:
            """Execute code statement by statement, yielding each executed statement.
    
            This method is useful for external progress tracking or debugging.
    
            Yields:
                (statement_source, metadata) tuples
            """
            self.total_statements = self.count_statements(code)
            self.current_statement = 0
            self.start_time = time.time()
    
            if self.total_statements == 0:
                return
    
            tree = ast.parse(code)
    
            for i, node in enumerate(tree.body):
                self.current_statement = i + 1
                statement_code = ast.unparse(node)
    
                # Create metadata
                elapsed_time = time.time() - (self.start_time or time.time())
                metadata = {
                    "statement_index": i,
                    "total_statements": self.total_statements,
                    "node_type": type(node).__name__,
                    "elapsed_seconds": elapsed_time,
                    "locals_before": dict(self.locals),
                }
    
                # Execute the statement
                try:
                    compiled = compile(
                        ast.Module(body=[node], type_ignores=[]), f"<statement_{i}>", "exec"
                    )
                    exec(compiled, self.globals, self.locals)
                    metadata["status"] = "success"
                except Exception as e:  # noqa: BLE001
                    metadata["status"] = "error"
                    metadata["error"] = str(e)
    
                metadata["locals_after"] = dict(self.locals)
    
                yield statement_code, metadata
    
                if self.step_delay > 0:
                    await asyncio.sleep(self.step_delay)
    

    __init__

    __init__(progress_callback: ProgressCallback | None = None, step_delay: float = 0.01)
    

    Initialize executor.

    Parameters:

    Name Type Description Default
    progress_callback ProgressCallback | None

    Optional callback for progress reporting

    None
    step_delay float

    Delay between statement executions (for async behavior)

    0.01
    Source code in src/llmling_agent/resource_providers/codemode/progress_executor.py
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    def __init__(
        self, progress_callback: ProgressCallback | None = None, step_delay: float = 0.01
    ):
        """Initialize executor.
    
        Args:
            progress_callback: Optional callback for progress reporting
            step_delay: Delay between statement executions (for async behavior)
        """
        self.globals: dict[str, Any] = {"__builtins__": __builtins__}
        self.locals: dict[str, Any] = {}
        self.progress_callback: ProgressCallback | None = progress_callback
        self.step_delay = step_delay
        self.total_statements = 0
        self.current_statement = 0
        self.start_time: float | None = None
    

    count_statements

    count_statements(code: str) -> int
    

    Count total statements in the code.

    Source code in src/llmling_agent/resource_providers/codemode/progress_executor.py
    42
    43
    44
    45
    46
    47
    48
    def count_statements(self, code: str) -> int:
        """Count total statements in the code."""
        try:
            tree = ast.parse(code)
            return len(tree.body)
        except SyntaxError:
            return 0
    

    execute_statements async

    execute_statements(code: str) -> AsyncIterator[tuple[str, dict[str, Any]]]
    

    Execute code statement by statement, yielding each executed statement.

    This method is useful for external progress tracking or debugging.

    Yields:

    Type Description
    AsyncIterator[tuple[str, dict[str, Any]]]

    (statement_source, metadata) tuples

    Source code in src/llmling_agent/resource_providers/codemode/progress_executor.py
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    async def execute_statements(
        self, code: str
    ) -> AsyncIterator[tuple[str, dict[str, Any]]]:
        """Execute code statement by statement, yielding each executed statement.
    
        This method is useful for external progress tracking or debugging.
    
        Yields:
            (statement_source, metadata) tuples
        """
        self.total_statements = self.count_statements(code)
        self.current_statement = 0
        self.start_time = time.time()
    
        if self.total_statements == 0:
            return
    
        tree = ast.parse(code)
    
        for i, node in enumerate(tree.body):
            self.current_statement = i + 1
            statement_code = ast.unparse(node)
    
            # Create metadata
            elapsed_time = time.time() - (self.start_time or time.time())
            metadata = {
                "statement_index": i,
                "total_statements": self.total_statements,
                "node_type": type(node).__name__,
                "elapsed_seconds": elapsed_time,
                "locals_before": dict(self.locals),
            }
    
            # Execute the statement
            try:
                compiled = compile(
                    ast.Module(body=[node], type_ignores=[]), f"<statement_{i}>", "exec"
                )
                exec(compiled, self.globals, self.locals)
                metadata["status"] = "success"
            except Exception as e:  # noqa: BLE001
                metadata["status"] = "error"
                metadata["error"] = str(e)
    
            metadata["locals_after"] = dict(self.locals)
    
            yield statement_code, metadata
    
            if self.step_delay > 0:
                await asyncio.sleep(self.step_delay)
    

    execute_with_progress async

    execute_with_progress(code: str) -> Any
    

    Execute code with automatic progress reporting.

    Parameters:

    Name Type Description Default
    code str

    Python source code to execute

    required

    Returns:

    Type Description
    Any

    Result of code execution

    Source code in src/llmling_agent/resource_providers/codemode/progress_executor.py
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    async def execute_with_progress(self, code: str) -> Any:
        """Execute code with automatic progress reporting.
    
        Args:
            code: Python source code to execute
    
        Returns:
            Result of code execution
        """
        self.total_statements = self.count_statements(code)
        self.current_statement = 0
        self.start_time = time.time()
    
        if self.total_statements == 0:
            return "No statements to execute"
    
        try:
            tree = ast.parse(code)
    
            for i, node in enumerate(tree.body):
                self.current_statement = i + 1
    
                # Report progress if callback available
                if self.progress_callback:
                    node_type = type(node).__name__
                    message = f"Executing {node_type.lower()} statement"
    
                    await self.progress_callback(
                        float(self.current_statement),
                        float(self.total_statements),
                        message,
                    )
    
                # Execute the statement
                compiled = compile(
                    ast.Module(body=[node], type_ignores=[]), f"<statement_{i}>", "exec"
                )
                exec(compiled, self.globals, self.locals)
    
                # Small delay for async behavior
                if self.step_delay > 0:
                    await asyncio.sleep(self.step_delay)
    
            # Try to get result from main() function or return success
            if "main" in self.locals and callable(self.locals["main"]):
                result = self.locals["main"]()
                if inspect.iscoroutine(result):
                    result = await result
                return result
        except Exception as e:
            # Report error with current progress
            if self.progress_callback:
                await self.progress_callback(
                    float(self.current_statement),
                    float(self.total_statements),
                    f"Error: {str(e)[:50]}...",
                )
            raise
        else:
            return "Code executed successfully"
    

    update_namespace

    update_namespace(namespace: dict[str, Any]) -> None
    

    Update the execution namespace with additional globals.

    Source code in src/llmling_agent/resource_providers/codemode/progress_executor.py
    38
    39
    40
    def update_namespace(self, namespace: dict[str, Any]) -> None:
        """Update the execution namespace with additional globals."""
        self.globals.update(namespace)
    

    run_me async

    run_me(ctx: AgentContext, run_context: RunContext)
    

    Test function using the unified progress system.

    Source code in src/llmling_agent/resource_providers/codemode/progress_executor.py
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
        async def run_me(ctx: AgentContext, run_context: RunContext):
            """Test function using the unified progress system."""
            code = """
    async def main():
        x = 10
        y = 20
        z = x + y
        print(f"Result: {z}")
        for i in range(3):
            print(f"Loop: {i}")
        return f"Completed calculation: {z}"
    """
    
            # Create contextual progress handler (like provider does)
            queuing_handler = create_queuing_progress_handler(ctx.agent._progress_queue)
    
            tool_name = run_context.tool_name or "run_me"
            tool_call_id = run_context.tool_call_id or ""
            tool_input = {"python_code": code}
    
            async def bound_progress(
                progress: float, total: float | None, message: str
            ) -> None:
                await queuing_handler(
                    progress, total, message, tool_name, tool_call_id, tool_input
                )
    
            executor = ProgressTrackingExecutor(bound_progress, step_delay=0.5)
            result = await executor.execute_with_progress(code)
            return result or "Code executed successfully"