Skip to content

executor

Class info

Classes

Name Children Inherits
ExecutionError
llmling_agent.running.executor
Raised when function execution fails.

    🛈 DocStrings

    Function execution management.

    ExecutionError

    Bases: Exception

    Raised when function execution fails.

    Source code in src/llmling_agent/running/executor.py
    26
    27
    class ExecutionError(Exception):
        """Raised when function execution fails."""
    

    discover_functions

    discover_functions(path: str | PathLike[str]) -> list[NodeFunction]
    

    Find all node functions in a module.

    Parameters:

    Name Type Description Default
    path str | PathLike[str]

    Path to Python module file

    required

    Returns:

    Type Description
    list[NodeFunction]

    List of discovered node functions

    Raises:

    Type Description
    ImportError

    If module cannot be imported

    ValueError

    If path is invalid

    Source code in src/llmling_agent/running/executor.py
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    def discover_functions(path: str | os.PathLike[str]) -> list[NodeFunction]:
        """Find all node functions in a module.
    
        Args:
            path: Path to Python module file
    
        Returns:
            List of discovered node functions
    
        Raises:
            ImportError: If module cannot be imported
            ValueError: If path is invalid
        """
        path_obj = _validate_path(path)
        # Import module
        spec = importlib.util.spec_from_file_location(path_obj.stem, path_obj)
        if not spec or not spec.loader:
            msg = f"Could not load module: {path}"
            raise ImportError(msg)
    
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)
    
        # Find decorated functions
        return [i for name, i in inspect.getmembers(module) if hasattr(i, "_node_function")]
    

    execute_functions async

    execute_functions(
        functions: list[NodeFunction],
        pool: AgentPool,
        inputs: dict[str, Any] | None = None,
        parallel: bool = False,
    ) -> dict[str, Any]
    

    Execute discovered functions in the right order.

    Source code in src/llmling_agent/running/executor.py
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    282
    283
    284
    285
    async def execute_functions(
        functions: list[NodeFunction],
        pool: AgentPool,
        inputs: dict[str, Any] | None = None,
        parallel: bool = False,
    ) -> dict[str, Any]:
        """Execute discovered functions in the right order."""
        msg = "Executing %d functions (parallel=%s)"
        logger.info(msg, len(functions), parallel)
        results: dict[str, Any] = {}
    
        # Sort by order/dependencies
        sorted_funcs = _sort_functions(functions)
        _validate_dependency_types(sorted_funcs)
    
        if parallel:
            # Group functions that can run in parallel
            groups = _group_parallel(sorted_funcs)
            for i, group in enumerate(groups):
                msg = "Executing parallel group %d/%d: %s"
                logger.debug(msg, i + 1, len(groups), [f.name for f in group])
    
                # Ensure previous results are available
                logger.debug("Available results: %s", sorted(results))
    
                # Run group in parallel
                tasks = [execute_single(func, pool, results, inputs) for func in group]
                group_results = await asyncio.gather(*tasks)
    
                # Update results after group completes
                results.update(dict(group_results))
                logger.debug("Group %d complete", i + 1)
    
                # Add small delay between groups to ensure timing separation
                if i < len(groups) - 1:
                    await asyncio.sleep(0.02)  # 20ms between groups
        else:
            # Execute sequentially
            for func in sorted_funcs:
                name, result = await execute_single(func, pool, results, inputs)
                results[name] = result
    
        return results
    

    execute_single async

    execute_single(
        func: NodeFunction,
        pool: AgentPool,
        available_results: dict[str, Any],
        inputs: dict[str, Any] | None = None,
    ) -> tuple[str, Any]
    

    Execute a single function.

    Parameters:

    Name Type Description Default
    func NodeFunction

    Function to execute

    required
    pool AgentPool

    Agent pool for injection

    required
    available_results dict[str, Any]

    Results from previous functions

    required
    inputs dict[str, Any] | None

    Optional input overrides

    None

    Returns:

    Type Description
    tuple[str, Any]

    Tuple of (function name, result)

    Raises:

    Type Description
    ExecutionError

    If execution fails

    Source code in src/llmling_agent/running/executor.py
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    async def execute_single(
        func: NodeFunction,
        pool: AgentPool,
        available_results: dict[str, Any],
        inputs: dict[str, Any] | None = None,
    ) -> tuple[str, Any]:
        """Execute a single function.
    
        Args:
            func: Function to execute
            pool: Agent pool for injection
            available_results: Results from previous functions
            inputs: Optional input overrides
    
        Returns:
            Tuple of (function name, result)
    
        Raises:
            ExecutionError: If execution fails
        """
        logger.debug("Executing %s", func.name)
        try:
            kwargs = func.default_inputs.copy()
            if inputs:
                kwargs.update(inputs)
    
            # Get type hints for the function
            hints = get_type_hints(func.func)
    
            # Add and validate dependency results
            for dep in func.depends_on:
                if dep not in available_results:
                    msg = f"Missing result from {dep}"
                    raise ExecutionError(msg)  # noqa: TRY301
    
                value = available_results[dep]
                if dep in hints:  # If parameter is type hinted
                    _validate_value_type(value, hints[dep], func.name, dep)
                kwargs[dep] = value
    
            # Execute with node injection
            wrapped = with_nodes(pool)(func.func)
            result = await wrapped(**kwargs)
    
            # Validate return type if hinted
            if "return" in hints:
                _validate_value_type(result, hints["return"], func.name, "return")
        except Exception as e:
            msg = f"Error executing {func.name}: {e}"
            raise ExecutionError(msg) from e
        else:
            return func.name, result