Skip to content

running

Class info

Classes

Name Children Inherits
NodeFunction
llmling_agent.running.discovery
Metadata for a function that uses nodes.
    NodeInjectionError
    llmling_agent.running.injection
    Raised when agent injection fails.

      🛈 DocStrings

      Decorator auto-injection / run package.

      NodeFunction dataclass

      Metadata for a function that uses nodes.

      Source code in src/llmling_agent/running/discovery.py
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      @dataclass
      class NodeFunction:
          """Metadata for a function that uses nodes."""
      
          func: AnyCallable
          """The actual function to execute."""
      
          depends_on: list[str] = field(default_factory=list)
          """Names of functions this one depends on."""
      
          deps: Any = None
          """Node dependencies."""
      
          default_inputs: dict[str, Any] = field(default_factory=dict)
          """Default parameter values."""
      
          name: str = field(init=False)
          """Function name (from function.__name__)."""
      
          def __post_init__(self):
              """Set name and validate dependencies."""
              self.name = self.func.__name__
              # Extract default inputs from function signature
      
              sig = inspect.signature(self.func)
              self.default_inputs = {
                  name: param.default
                  for name, param in sig.parameters.items()
                  if param.default is not param.empty
              }
              msg = "Registered node function %s (deps=%s)"
              logger.debug(msg, self.name, self.depends_on)
      

      default_inputs class-attribute instance-attribute

      default_inputs: dict[str, Any] = field(default_factory=dict)
      

      Default parameter values.

      depends_on class-attribute instance-attribute

      depends_on: list[str] = field(default_factory=list)
      

      Names of functions this one depends on.

      deps class-attribute instance-attribute

      deps: Any = None
      

      Node dependencies.

      func instance-attribute

      func: AnyCallable
      

      The actual function to execute.

      name class-attribute instance-attribute

      name: str = field(init=False)
      

      Function name (from function.name).

      __post_init__

      __post_init__()
      

      Set name and validate dependencies.

      Source code in src/llmling_agent/running/discovery.py
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      def __post_init__(self):
          """Set name and validate dependencies."""
          self.name = self.func.__name__
          # Extract default inputs from function signature
      
          sig = inspect.signature(self.func)
          self.default_inputs = {
              name: param.default
              for name, param in sig.parameters.items()
              if param.default is not param.empty
          }
          msg = "Registered node function %s (deps=%s)"
          logger.debug(msg, self.name, self.depends_on)
      

      NodeInjectionError

      Bases: Exception

      Raised when agent injection fails.

      Source code in src/llmling_agent/running/injection.py
      41
      42
      class NodeInjectionError(Exception):
          """Raised when agent injection fails."""
      

      execute_functions async

      execute_functions(
          functions: list[NodeFunction],
          pool: AgentPool,
          inputs: dict[str, Any] | None = None,
          parallel: bool = False,
      ) -> dict[str, Any]
      

      Execute discovered functions in the right order.

      Source code in src/llmling_agent/running/executor.py
      243
      244
      245
      246
      247
      248
      249
      250
      251
      252
      253
      254
      255
      256
      257
      258
      259
      260
      261
      262
      263
      264
      265
      266
      267
      268
      269
      270
      271
      272
      273
      274
      275
      276
      277
      278
      279
      280
      281
      282
      283
      284
      285
      async def execute_functions(
          functions: list[NodeFunction],
          pool: AgentPool,
          inputs: dict[str, Any] | None = None,
          parallel: bool = False,
      ) -> dict[str, Any]:
          """Execute discovered functions in the right order."""
          msg = "Executing %d functions (parallel=%s)"
          logger.info(msg, len(functions), parallel)
          results: dict[str, Any] = {}
      
          # Sort by order/dependencies
          sorted_funcs = _sort_functions(functions)
          _validate_dependency_types(sorted_funcs)
      
          if parallel:
              # Group functions that can run in parallel
              groups = _group_parallel(sorted_funcs)
              for i, group in enumerate(groups):
                  msg = "Executing parallel group %d/%d: %s"
                  logger.debug(msg, i + 1, len(groups), [f.name for f in group])
      
                  # Ensure previous results are available
                  logger.debug("Available results: %s", sorted(results))
      
                  # Run group in parallel
                  tasks = [execute_single(func, pool, results, inputs) for func in group]
                  group_results = await asyncio.gather(*tasks)
      
                  # Update results after group completes
                  results.update(dict(group_results))
                  logger.debug("Group %d complete", i + 1)
      
                  # Add small delay between groups to ensure timing separation
                  if i < len(groups) - 1:
                      await asyncio.sleep(0.02)  # 20ms between groups
          else:
              # Execute sequentially
              for func in sorted_funcs:
                  name, result = await execute_single(func, pool, results, inputs)
                  results[name] = result
      
          return results
      

      execute_single async

      execute_single(
          func: NodeFunction,
          pool: AgentPool,
          available_results: dict[str, Any],
          inputs: dict[str, Any] | None = None,
      ) -> tuple[str, Any]
      

      Execute a single function.

      Parameters:

      Name Type Description Default
      func NodeFunction

      Function to execute

      required
      pool AgentPool

      Agent pool for injection

      required
      available_results dict[str, Any]

      Results from previous functions

      required
      inputs dict[str, Any] | None

      Optional input overrides

      None

      Returns:

      Type Description
      tuple[str, Any]

      Tuple of (function name, result)

      Raises:

      Type Description
      ExecutionError

      If execution fails

      Source code in src/llmling_agent/running/executor.py
      151
      152
      153
      154
      155
      156
      157
      158
      159
      160
      161
      162
      163
      164
      165
      166
      167
      168
      169
      170
      171
      172
      173
      174
      175
      176
      177
      178
      179
      180
      181
      182
      183
      184
      185
      186
      187
      188
      189
      190
      191
      192
      193
      194
      195
      196
      197
      198
      199
      200
      201
      202
      async def execute_single(
          func: NodeFunction,
          pool: AgentPool,
          available_results: dict[str, Any],
          inputs: dict[str, Any] | None = None,
      ) -> tuple[str, Any]:
          """Execute a single function.
      
          Args:
              func: Function to execute
              pool: Agent pool for injection
              available_results: Results from previous functions
              inputs: Optional input overrides
      
          Returns:
              Tuple of (function name, result)
      
          Raises:
              ExecutionError: If execution fails
          """
          logger.debug("Executing %s", func.name)
          try:
              kwargs = func.default_inputs.copy()
              if inputs:
                  kwargs.update(inputs)
      
              # Get type hints for the function
              hints = get_type_hints(func.func)
      
              # Add and validate dependency results
              for dep in func.depends_on:
                  if dep not in available_results:
                      msg = f"Missing result from {dep}"
                      raise ExecutionError(msg)  # noqa: TRY301
      
                  value = available_results[dep]
                  if dep in hints:  # If parameter is type hinted
                      _validate_value_type(value, hints[dep], func.name, dep)
                  kwargs[dep] = value
      
              # Execute with node injection
              wrapped = with_nodes(pool)(func.func)
              result = await wrapped(**kwargs)
      
              # Validate return type if hinted
              if "return" in hints:
                  _validate_value_type(result, hints["return"], func.name, "return")
          except Exception as e:
              msg = f"Error executing {func.name}: {e}"
              raise ExecutionError(msg) from e
          else:
              return func.name, result
      

      node_function

      node_function(
          func: Callable | None = None,
          *,
          deps: Any | None = None,
          depends_on: str | Sequence[str | Callable] | Callable | None = None,
      ) -> Callable
      

      Mark a function for automatic node execution.

      Can be used as simple decorator or with arguments:

      @node_function async def func(): ...

      @node_function(order=1, depends_on="other_func") async def func(): ...

      Parameters:

      Name Type Description Default
      func Callable | None

      Function to mark

      None
      deps Any | None

      Dependencies to inject into all Agent parameters

      None
      depends_on str | Sequence[str | Callable] | Callable | None

      Names of functions this one depends on

      None

      Returns:

      Type Description
      Callable

      Decorated function

      Source code in src/llmling_agent/running/discovery.py
       55
       56
       57
       58
       59
       60
       61
       62
       63
       64
       65
       66
       67
       68
       69
       70
       71
       72
       73
       74
       75
       76
       77
       78
       79
       80
       81
       82
       83
       84
       85
       86
       87
       88
       89
       90
       91
       92
       93
       94
       95
       96
       97
       98
       99
      100
      101
      102
      def node_function(
          func: Callable | None = None,
          *,
          deps: Any | None = None,
          depends_on: str | Sequence[str | Callable] | Callable | None = None,
      ) -> Callable:
          """Mark a function for automatic node execution.
      
          Can be used as simple decorator or with arguments:
      
          @node_function
          async def func(): ...
      
          @node_function(order=1, depends_on="other_func")
          async def func(): ...
      
          Args:
              func: Function to mark
              deps: Dependencies to inject into all Agent parameters
              depends_on: Names of functions this one depends on
      
          Returns:
              Decorated function
          """
      
          def decorator(func: Callable) -> Callable:
              match depends_on:
                  case None:
                      depends_on_ = []
                  case str():
                      depends_on_ = [depends_on]
                  case Callable():
                      depends_on_ = [depends_on.__name__]
      
                  case [*items]:
                      depends_on_ = [
                          i.__name__ if isinstance(i, Callable) else str(i)  # type: ignore[union-attr, arg-type]
                          for i in items
                      ]
                  case _:
                      msg = f"Invalid depends_on: {depends_on}"
                      raise ValueError(msg)
              # TODO: we still need to inject the deps in execution part.
              metadata = NodeFunction(func=func, depends_on=depends_on_ or [], deps=deps)
              func._node_function = metadata  # type: ignore
              return func
      
          return decorator(func) if func is not None else decorator
      

      run_nodes_async async

      run_nodes_async(
          config: JoinablePathLike | AgentsManifest,
          *,
          module: str | None = None,
          functions: list[str] | None = None,
          inputs: dict[str, Any] | None = None,
          parallel: bool = False,
      ) -> dict[str, Any]
      

      Execute node functions with dependency handling.

      Parameters:

      Name Type Description Default
      config JoinablePathLike | AgentsManifest

      Node configuration (path or manifest)

      required
      module str | None

      Optional module to discover functions from

      None
      functions list[str] | None

      Optional list of function names to run (auto-discovers if None)

      None
      inputs dict[str, Any] | None

      Optional input values for function parameters

      None
      parallel bool

      Whether to run independent functions in parallel

      False

      Returns:

      Type Description
      dict[str, Any]

      Dict mapping function names to their results

      Example
      @node_function
      async def analyze(analyzer: Agent) -> str:
          return await analyzer.run("...")
      
      results = await run_nodes_async("agents.yml")
      print(results["analyze"])
      
      Source code in src/llmling_agent/running/run_nodes.py
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      async def run_nodes_async(
          config: JoinablePathLike | AgentsManifest,
          *,
          module: str | None = None,
          functions: list[str] | None = None,
          inputs: dict[str, Any] | None = None,
          parallel: bool = False,
      ) -> dict[str, Any]:
          """Execute node functions with dependency handling.
      
          Args:
              config: Node configuration (path or manifest)
              module: Optional module to discover functions from
              functions: Optional list of function names to run (auto-discovers if None)
              inputs: Optional input values for function parameters
              parallel: Whether to run independent functions in parallel
      
          Returns:
              Dict mapping function names to their results
      
          Example:
              ```python
              @node_function
              async def analyze(analyzer: Agent) -> str:
                  return await analyzer.run("...")
      
              results = await run_nodes_async("agents.yml")
              print(results["analyze"])
              ```
          """
          from llmling_agent import AgentPool
      
          # Find functions to run
          if module:
              discovered = discover_functions(module)
          else:
              # Use calling module
              import inspect
      
              frame = inspect.currentframe()
              while frame:
                  if frame.f_globals.get("__name__") != __name__:
                      break
                  frame = frame.f_back
              if not frame:
                  msg = "Could not determine calling module"
                  raise RuntimeError(msg)
              discovered = discover_functions(frame.f_globals["__file__"])
      
          if functions:
              discovered = [f for f in discovered if f.name in functions]
      
          # Run with pool
          async with AgentPool[None](config) as pool:
              return await execute_functions(
                  discovered,
                  pool,
                  inputs=inputs,
                  parallel=parallel,
              )
      

      with_nodes

      with_nodes(
          pool: AgentPool,
      ) -> Callable[[Callable[P, Awaitable[T]]], Callable[P, Awaitable[T]]]
      
      with_nodes(
          pool: AgentPool, func: Callable[P, Awaitable[T]]
      ) -> Callable[P, Awaitable[T]]
      
      with_nodes(
          pool: AgentPool, func: Callable[P, Awaitable[T]] | None = None
      ) -> (
          Callable[P, Awaitable[T]]
          | Callable[[Callable[P, Awaitable[T]]], Callable[P, Awaitable[T]]]
      )
      

      Inject nodes into function parameters.

      Source code in src/llmling_agent/running/decorators.py
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      def with_nodes[T, **P](
          pool: AgentPool,
          func: Callable[P, Awaitable[T]] | None = None,
      ) -> (
          Callable[P, Awaitable[T]]
          | Callable[[Callable[P, Awaitable[T]]], Callable[P, Awaitable[T]]]
      ):
          """Inject nodes into function parameters."""
      
          def decorator(func: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]:
              @wraps(func)
              async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
                  # Convert args to kwargs for injection check
                  sig = inspect.signature(func)
                  bound_args = sig.bind_partial(*args)
                  all_kwargs = {**bound_args.arguments, **kwargs}
      
                  # Get needed nodes
                  nodes = inject_nodes(func, pool, all_kwargs)
      
                  # Create kwargs with nodes first, then other args
                  final_kwargs = {**nodes, **kwargs}
      
                  # Convert back to args/kwargs using signature
                  bound = sig.bind(**final_kwargs)
                  bound.apply_defaults()
      
                  # Call with proper args/kwargs
                  return await func(*bound.args, **bound.kwargs)
      
              return wrapper
      
          return decorator(func) if func else decorator